|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
#ifdef _DEBUG
|
|
Toshihiro Shimizu |
890ddd |
#define _STLP_DEBUG 1
|
|
Toshihiro Shimizu |
890ddd |
#endif
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// TnzCore includes
|
|
Toshihiro Shimizu |
890ddd |
#include "tthreadmessage.h"
|
|
Toshihiro Shimizu |
890ddd |
#include "tsystem.h"
|
|
Toshihiro Shimizu |
890ddd |
#include "tatomicvar.h"
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
#include "tthread.h"
|
|
Campbell Barton |
d0e335 |
#include "tthreadp.h"
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// STL includes
|
|
Toshihiro Shimizu |
890ddd |
#include <set></set>
|
|
Toshihiro Shimizu |
890ddd |
#include <deque></deque>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// tcg includes
|
|
Toshihiro Shimizu |
890ddd |
#include "tcg/tcg_pool.h"
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Qt includes
|
|
Toshihiro Shimizu |
890ddd |
#include <qmultimap></qmultimap>
|
|
Toshihiro Shimizu |
890ddd |
#include <qmutablemapiterator></qmutablemapiterator>
|
|
Toshihiro Shimizu |
890ddd |
#include <qmutex></qmutex>
|
|
Toshihiro Shimizu |
890ddd |
#include <qwaitcondition></qwaitcondition>
|
|
Toshihiro Shimizu |
890ddd |
#include <qmetatype></qmetatype>
|
|
Toshihiro Shimizu |
890ddd |
#include <qcoreapplication></qcoreapplication>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==================================================
|
|
Toshihiro Shimizu |
890ddd |
// Paradigms of the Executor tasks management
|
|
Toshihiro Shimizu |
890ddd |
//--------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Basics:
|
|
Toshihiro Shimizu |
890ddd |
// * Tasks added by Executors are always stored in a global QMultiMap first -
|
|
Toshihiro Shimizu |
890ddd |
// ordering primarily being the schedulingPriority(), and insertion instant
|
|
Toshihiro Shimizu |
890ddd |
// (implicit) when they have the same scheduling priority.
|
|
Toshihiro Shimizu |
890ddd |
// * The QMultiMap needs to be reverse-iterated to do that, since values with
|
|
Toshihiro Shimizu |
890ddd |
// the same key are ordered from the most recent to the oldest one (see Qt's
|
|
Toshihiro Shimizu |
890ddd |
// manual).
|
|
Toshihiro Shimizu |
890ddd |
// * Worker threads are stored in a global set.
|
|
Toshihiro Shimizu |
890ddd |
// * When a task is added or a task has been performed, the workers list is
|
|
Toshihiro Shimizu |
890ddd |
// refreshed, possibly adding new Workers for some executable tasks.
|
|
Shinya Kitaoka |
120a6e |
// * When a worker ends a task, it automatically takes a new one before
|
|
Shinya Kitaoka |
120a6e |
// refreshing
|
|
Toshihiro Shimizu |
890ddd |
// the workers list. If no task can be taken, by default the thread exits and
|
|
Toshihiro Shimizu |
890ddd |
// invokes its own destruction.
|
|
Toshihiro Shimizu |
890ddd |
// * The thread may instead be put to rest if explicitly told by the user with
|
|
Toshihiro Shimizu |
890ddd |
// the appropriate method.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Default execution conditions:
|
|
Shinya Kitaoka |
120a6e |
// * A task is executable if, by default, its task load added to the sum of
|
|
Shinya Kitaoka |
120a6e |
// that of
|
|
Shinya Kitaoka |
120a6e |
// all other active tasks does not exceed the available resources of the
|
|
Shinya Kitaoka |
120a6e |
// system
|
|
Toshihiro Shimizu |
890ddd |
// (i.e.: 100 * # of cores of the machine).
|
|
Shinya Kitaoka |
120a6e |
// In other words, in every instant of execution, the sum of all active
|
|
Shinya Kitaoka |
120a6e |
// task's loads
|
|
Toshihiro Shimizu |
890ddd |
// never exceeds the available machine resources.
|
|
Shinya Kitaoka |
120a6e |
// * When such default execution condition is not met when attempting to take
|
|
Shinya Kitaoka |
120a6e |
// the
|
|
Shinya Kitaoka |
120a6e |
// task, no other task is taken instead - we wait until enough resources have
|
|
Shinya Kitaoka |
120a6e |
// been
|
|
Toshihiro Shimizu |
890ddd |
// freed before attempting to take the same task again.
|
|
Toshihiro Shimizu |
890ddd |
// In other words, the default execution condition is *BLOCKING*.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Custom execution conditions:
|
|
Shinya Kitaoka |
120a6e |
// * The user may decide to impose more tight conditions for tasks added by a
|
|
Shinya Kitaoka |
120a6e |
// certain
|
|
Toshihiro Shimizu |
890ddd |
// Executor. Let's call such conditions 'custom' conditions.
|
|
Shinya Kitaoka |
120a6e |
// * Custom conditions are always tested *AFTER* the default ones (on the same
|
|
Shinya Kitaoka |
120a6e |
// task).
|
|
Toshihiro Shimizu |
890ddd |
// This is necessary to enforce the global scheduling priorities mechanism.
|
|
Shinya Kitaoka |
120a6e |
// * When no task of a certain executor is active, custom conditions are always
|
|
Shinya Kitaoka |
120a6e |
// considered
|
|
Toshihiro Shimizu |
890ddd |
// satisfied.
|
|
Shinya Kitaoka |
120a6e |
// * If custom conditions are not met, we enqueue the task in an
|
|
Shinya Kitaoka |
120a6e |
// Executor-private queue
|
|
Shinya Kitaoka |
120a6e |
// for later execution and remove it from the global tasks queue - making it
|
|
Shinya Kitaoka |
120a6e |
// possible
|
|
Shinya Kitaoka |
120a6e |
// to perform other tasks before our custom-failed one (such operation will
|
|
Shinya Kitaoka |
120a6e |
// be called
|
|
Toshihiro Shimizu |
890ddd |
// "ACCUMULATION").
|
|
Shinya Kitaoka |
120a6e |
// In other words, custom conditions are *NOT BLOCKING* among different
|
|
Shinya Kitaoka |
120a6e |
// Executors.
|
|
Shinya Kitaoka |
120a6e |
// * Tasks in the last task's Executor-private queue are always polled before
|
|
Shinya Kitaoka |
120a6e |
// those in
|
|
Shinya Kitaoka |
120a6e |
// the global queue by Workers which ended a task, and this is done in a
|
|
Shinya Kitaoka |
120a6e |
// *BLOCKING* way
|
|
Toshihiro Shimizu |
890ddd |
// inside the same Executor.
|
|
Shinya Kitaoka |
120a6e |
// * When an Executor-private tasks queue is not empty, all the other tasks
|
|
Shinya Kitaoka |
120a6e |
// added by the
|
|
Toshihiro Shimizu |
890ddd |
// same executor are scheduled in the queue.
|
|
Shinya Kitaoka |
120a6e |
// In other words, the order of execution is always that of global insertion,
|
|
Shinya Kitaoka |
120a6e |
// *inside
|
|
Toshihiro Shimizu |
890ddd |
// the same executor*.
|
|
Shinya Kitaoka |
120a6e |
// * Tasks polled from an Executor-private queue (which therefore satisfied
|
|
Shinya Kitaoka |
120a6e |
// custom conditions)
|
|
Shinya Kitaoka |
120a6e |
// may still fail with default execution conditions. If that happens, we put
|
|
Shinya Kitaoka |
120a6e |
// the task
|
|
Shinya Kitaoka |
120a6e |
// back into the global queue with highest possible priority (timedOut) and
|
|
Shinya Kitaoka |
120a6e |
// the worker dies.
|
|
Shinya Kitaoka |
120a6e |
// Tasks with this special priority are polled *before every other task*. So,
|
|
Shinya Kitaoka |
120a6e |
// again, default
|
|
Toshihiro Shimizu |
890ddd |
// conditions are *BLOCKING*.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Thread-safety:
|
|
Shinya Kitaoka |
120a6e |
// * Most of the following code is mutex-protected, altough it might seem not -
|
|
Shinya Kitaoka |
120a6e |
// indeed,
|
|
Shinya Kitaoka |
120a6e |
// only *one* mutex is locked and unlocked all of the time. This 'transition
|
|
Shinya Kitaoka |
120a6e |
// mutex' is
|
|
Shinya Kitaoka |
120a6e |
// the key to thread-safety: we're considered to lie in a 'transition state'
|
|
Shinya Kitaoka |
120a6e |
// if we are
|
|
Shinya Kitaoka |
120a6e |
// operating outside the run() of some task - which covers almost the
|
|
Shinya Kitaoka |
120a6e |
// totality of the code.
|
|
Shinya Kitaoka |
120a6e |
// * The transition mutex is *not* recursive. The reason is that threads con
|
|
Shinya Kitaoka |
120a6e |
// not wait on
|
|
Shinya Kitaoka |
120a6e |
// QWaitConditions if the mutex is recursive. That makes it necessary (and
|
|
Shinya Kitaoka |
120a6e |
// welcome) to put
|
|
Shinya Kitaoka |
120a6e |
// mutex lockers in strategic points of the code - in many low-level
|
|
Shinya Kitaoka |
120a6e |
// functions no mutex
|
|
Shinya Kitaoka |
120a6e |
// is locked *because some caller already did it before*. If you're modifying
|
|
Shinya Kitaoka |
120a6e |
// the code
|
|
Shinya Kitaoka |
120a6e |
// always trace back the callers of a function before inserting misleading
|
|
Shinya Kitaoka |
120a6e |
// mutex lockers.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==================
|
|
Toshihiro Shimizu |
890ddd |
// TODO list
|
|
Toshihiro Shimizu |
890ddd |
//------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// * Improve dedicated threads support: make sure that tasks added to a
|
|
Shinya Kitaoka |
120a6e |
// dedicated
|
|
Shinya Kitaoka |
120a6e |
// executor are directly moved to the accumulation queue. The
|
|
Shinya Kitaoka |
120a6e |
// setDedicatedThreads()
|
|
Toshihiro Shimizu |
890ddd |
// method must therefore react accordingly.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * It could be possible to implement a dependency-based mechanism...
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// * Should the hosting thread wait for worker ones upon ExecutorImp
|
|
Shinya Kitaoka |
120a6e |
// destruction??
|
|
Toshihiro Shimizu |
890ddd |
// It could be a problem on some forever-waiting tasks...
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// * Ricontrolla che con le ultime modifiche gli ExecutorId rimangano quando si
|
|
Shinya Kitaoka |
120a6e |
// passano
|
|
Toshihiro Shimizu |
890ddd |
// i puntatori in takeTask e refreshAss..
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
using namespace TThread;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
DEFINE_CLASS_CODE(TThread::Runnable, 21)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==========================================
|
|
Toshihiro Shimizu |
890ddd |
// Global init() initializer function
|
|
Toshihiro Shimizu |
890ddd |
//------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void TThread::init() {
|
|
Shinya Kitaoka |
120a6e |
Executor::init();
|
|
Shinya Kitaoka |
120a6e |
TThreadMessageDispatcher::init();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void TThread::shutdown() { Executor::shutdown(); }
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
namespace TThread {
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Worker Thread class
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! A Worker is a specialized QThread that continuously polls Runnable
|
|
Toshihiro Shimizu |
890ddd |
//! tasks from a global execution queue to make them work.
|
|
Shinya Kitaoka |
d1f6c4 |
class Worker final : public QThread {
|
|
Toshihiro Shimizu |
890ddd |
public:
|
|
Shinya Kitaoka |
120a6e |
RunnableP m_task;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
TSmartPointerT<executorid> m_master;</executorid>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
bool m_exit;
|
|
Shinya Kitaoka |
120a6e |
QWaitCondition m_waitCondition;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
Worker();
|
|
Shinya Kitaoka |
120a6e |
~Worker();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
473e70 |
void run() override;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void takeTask();
|
|
Shinya Kitaoka |
120a6e |
inline bool canAdopt(const RunnableP &task);
|
|
Shinya Kitaoka |
120a6e |
inline void adoptTask(RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void rest();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void updateCountsOnTake();
|
|
Shinya Kitaoka |
120a6e |
inline void updateCountsOnRelease();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void onFinish();
|
|
Toshihiro Shimizu |
890ddd |
};
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//========================
|
|
Toshihiro Shimizu |
890ddd |
// ExecutorId class
|
|
Toshihiro Shimizu |
890ddd |
//------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Contains all the executor data that need to persist until all tasks
|
|
Toshihiro Shimizu |
890ddd |
//! added by the executor have been processed. Upon creation of an Executor
|
|
Toshihiro Shimizu |
890ddd |
//! object, it instantiates one ExecutorId for both storing the Executor's data
|
|
Toshihiro Shimizu |
890ddd |
//! sensible to the underlying code of the Executor manager, and to identify all
|
|
Toshihiro Shimizu |
890ddd |
//! tasks added through it - by copying the smart pointer to the id into each
|
|
Toshihiro Shimizu |
890ddd |
//! added task.
|
|
Toshihiro Shimizu |
890ddd |
//! \sa Executor and Runnable class.
|
|
Shinya Kitaoka |
d1f6c4 |
class ExecutorId final : public TSmartObject {
|
|
Toshihiro Shimizu |
890ddd |
public:
|
|
Shinya Kitaoka |
120a6e |
size_t m_id;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
int m_activeTasks;
|
|
Shinya Kitaoka |
120a6e |
int m_maxActiveTasks;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
int m_activeLoad;
|
|
Shinya Kitaoka |
120a6e |
int m_maxActiveLoad;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
bool m_dedicatedThreads;
|
|
Shinya Kitaoka |
120a6e |
bool m_persistentThreads;
|
|
Shinya Kitaoka |
120a6e |
std::deque<worker *=""> m_sleepings;</worker>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
ExecutorId();
|
|
Shinya Kitaoka |
120a6e |
~ExecutorId();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void accumulate(const RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void newWorker(RunnableP &task);
|
|
Shinya Kitaoka |
120a6e |
void refreshDedicatedList();
|
|
Toshihiro Shimizu |
890ddd |
};
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Executor::Imp class
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! ExecutorImp both manages the allocation of worker threads for the
|
|
Toshihiro Shimizu |
890ddd |
//! execution of Runnable tasks and centralizes the tasks collection.
|
|
Toshihiro Shimizu |
890ddd |
//! One process only hosts one instance of the ExecutorImp class as a
|
|
Toshihiro Shimizu |
890ddd |
//! a global variable that needs to be allocated in an application-lasting
|
|
Toshihiro Shimizu |
890ddd |
//! and event-looped thread - typically the main thread in GUI applications.
|
|
Shinya Kitaoka |
120a6e |
class ExecutorImp {
|
|
Toshihiro Shimizu |
890ddd |
public:
|
|
Shinya Kitaoka |
120a6e |
QMultiMap<int, runnablep=""> m_tasks;</int,>
|
|
Shinya Kitaoka |
120a6e |
std::set<worker *=""> m_workers; // Used just for debugging purposes</worker>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
tcg::indices_pool<> m_executorIdPool;
|
|
Shinya Kitaoka |
120a6e |
std::vector<uchar> m_waitingFlagsPool;</uchar>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
int m_activeLoad;
|
|
Shinya Kitaoka |
120a6e |
int m_maxLoad;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
QMutex m_transitionMutex; // Workers' transition mutex
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
ExecutorImp();
|
|
Shinya Kitaoka |
120a6e |
~ExecutorImp();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void insertTask(int schedulingPriority, RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void refreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline bool isExecutable(RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
};
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
} // namespace TThread
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Global variables
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
namespace {
|
|
Shinya Kitaoka |
120a6e |
ExecutorImp *globalImp = 0;
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImpSlots *globalImpSlots = 0;
|
|
Shinya Kitaoka |
120a6e |
bool shutdownVar = false;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=============================
|
|
Toshihiro Shimizu |
890ddd |
// ExecutorImp methods
|
|
Toshihiro Shimizu |
890ddd |
//-----------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImp::ExecutorImp()
|
|
Shinya Kitaoka |
120a6e |
: m_activeLoad(0)
|
|
Shinya Kitaoka |
120a6e |
, m_maxLoad(TSystem::getProcessorCount() * 100)
|
|
Shinya Kitaoka |
120a6e |
, m_transitionMutex() // NOTE: We'll wait on this mutex - so it can't be
|
|
Shinya Kitaoka |
120a6e |
// recursive
|
|
Shinya Kitaoka |
120a6e |
{}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
ExecutorImp::~ExecutorImp() {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// A task is executable <==> its load allows it. The task load is considered
|
|
Shinya Kitaoka |
120a6e |
// fixed until another isExecutable() call is made again - in case it may
|
|
Shinya Kitaoka |
120a6e |
// change in time.
|
|
Shinya Kitaoka |
120a6e |
inline bool ExecutorImp::isExecutable(RunnableP &task) {
|
|
Shinya Kitaoka |
120a6e |
return m_activeLoad + task->m_load <= m_maxLoad;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void ExecutorImp::insertTask(int schedulingPriority, RunnableP &task) {
|
|
Shinya Kitaoka |
120a6e |
task->m_schedulingPriority = schedulingPriority;
|
|
Shinya Kitaoka |
120a6e |
m_tasks.insert(schedulingPriority, task);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//========================
|
|
Toshihiro Shimizu |
890ddd |
// Runnable methods
|
|
Toshihiro Shimizu |
890ddd |
//------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
Runnable::Runnable() : TSmartObject(m_classCode), m_id(0) {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
Runnable::~Runnable() {
|
|
Shinya Kitaoka |
120a6e |
if (m_id) m_id->release(); // see Executor::addTask()
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
//! Returns the predicted CPU load generated by the task, expressed in
|
|
Shinya Kitaoka |
120a6e |
//! percentage
|
|
Shinya Kitaoka |
120a6e |
//! of core usage (that is, 100 is intended to fully occupy one processing
|
|
Shinya Kitaoka |
120a6e |
//! core).
|
|
Shinya Kitaoka |
120a6e |
//! Appropriate task load calibration is an important step to take when
|
|
Shinya Kitaoka |
120a6e |
//! implementing
|
|
Toshihiro Shimizu |
890ddd |
//! a new task; for this purpose, remember some rules to follow:
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Shinya Kitaoka |
120a6e |
//! In every moment, the task manager ensures that the overall sum of the
|
|
Shinya Kitaoka |
120a6e |
//! active
|
|
Shinya Kitaoka |
120a6e |
//! task's load does not exceed the number of machine's processing cores
|
|
Shinya Kitaoka |
120a6e |
//! multiplied
|
|
Shinya Kitaoka |
120a6e |
//! by 100. This condition is \a blocking with respect to the execution of
|
|
Shinya Kitaoka |
120a6e |
//! any other
|
|
Shinya Kitaoka |
120a6e |
//! task - meaning that when a task is about to be executed the task manager
|
|
Shinya Kitaoka |
120a6e |
//! \a waits
|
|
Toshihiro Shimizu |
890ddd |
//! until enough CPU resources are available to make it run.
|
|
Shinya Kitaoka |
120a6e |
//! In particular, observe that a task's load \b never has to exceed the
|
|
Shinya Kitaoka |
120a6e |
//! total CPU
|
|
Shinya Kitaoka |
120a6e |
//! resources - doing so would surely result in a block of your application.
|
|
Shinya Kitaoka |
120a6e |
//! The number
|
|
Shinya Kitaoka |
120a6e |
//! of available cores can be accessed via the \b
|
|
Shinya Kitaoka |
120a6e |
//! TSystem::getProcessorCount() or
|
|
Toshihiro Shimizu |
890ddd |
//! \b QThread::idealThreadCount().
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
//! The task load is considered constant for the duration of the task.
|
|
Shinya Kitaoka |
120a6e |
//! Changing its
|
|
Shinya Kitaoka |
120a6e |
//! value does not affect the task manager in any way once the task has been
|
|
Shinya Kitaoka |
120a6e |
//! started.
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
//! The default task load is 0, representing a very light task. If the task
|
|
Shinya Kitaoka |
120a6e |
//! load
|
|
Shinya Kitaoka |
120a6e |
//! is 0 the condition at point 1 always succeeds - so the task is always
|
|
Shinya Kitaoka |
120a6e |
//! executed when
|
|
Shinya Kitaoka |
120a6e |
//! encountered. Observe that a long succession of 0 task loads leads to the
|
|
Shinya Kitaoka |
120a6e |
//! creation of
|
|
Shinya Kitaoka |
120a6e |
//! a proportional number of threads simultaneously running to dispatch it;
|
|
Shinya Kitaoka |
120a6e |
//! if this is
|
|
Toshihiro Shimizu |
890ddd |
//! a problem, consider the use of \b Executor::setMaxActiveTasks()
|
|
Toshihiro Shimizu |
890ddd |
//! to make only a certain number of tasks being executed at the same time.
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Shinya Kitaoka |
120a6e |
int Runnable::taskLoad() { return 0; }
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Returns the priority value used to schedule a task for execution. Tasks
|
|
Toshihiro Shimizu |
890ddd |
//! with higher priority start before tasks with lower priority. The default
|
|
Toshihiro Shimizu |
890ddd |
//! value returned is 5 (halfway from 0 to 10) - but any value other than
|
|
Toshihiro Shimizu |
890ddd |
//! (std::numeric_limits<int>::max)() is acceptable.</int>
|
|
Shinya Kitaoka |
120a6e |
int Runnable::schedulingPriority() { return 5; }
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Returns the QThread::Priority used by worker threads when they adopt
|
|
Toshihiro Shimizu |
890ddd |
//! the task. The default value returned is QThread::Normal.
|
|
Shinya Kitaoka |
120a6e |
QThread::Priority Runnable::runningPriority() {
|
|
Shinya Kitaoka |
120a6e |
return QThread::NormalPriority;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline bool Runnable::customConditions() {
|
|
Shinya Kitaoka |
120a6e |
return (m_id->m_activeTasks < m_id->m_maxActiveTasks) &&
|
|
Shinya Kitaoka |
120a6e |
(m_id->m_activeLoad + m_load <= m_id->m_maxActiveLoad);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::started(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
This signal is emitted from working threads just before the run() code
|
|
Toshihiro Shimizu |
890ddd |
is executed. Observe that the passed smart pointer ensures the survival of
|
|
Toshihiro Shimizu |
890ddd |
the emitting task for the time required by connected slots execution.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\warning The started(), finished() and exception() signals are emitted in
|
|
Toshihiro Shimizu |
890ddd |
a mutex-protected environment in order to provide the correct sequence of
|
|
Toshihiro Shimizu |
890ddd |
emissions (i.e. so that canceled() and terminated() controller-emitted signals
|
|
Toshihiro Shimizu |
890ddd |
are either delivered after started() and before finished() or exception(),
|
|
Toshihiro Shimizu |
890ddd |
or \a instead of them).
|
|
Shinya Kitaoka |
120a6e |
\warning Thus, setting up blocking connections or \a direct slots that contain
|
|
Shinya Kitaoka |
120a6e |
a
|
|
Toshihiro Shimizu |
890ddd |
blocking instruction or even calls to the Executor API (which would definitely
|
|
Toshihiro Shimizu |
890ddd |
try to relock the aforementioned mutex) is dangerous and could result in an
|
|
Toshihiro Shimizu |
890ddd |
application freeze.
|
|
Shinya Kitaoka |
120a6e |
\warning In case it's necessary to use blocking features, they should be
|
|
Shinya Kitaoka |
120a6e |
enforced
|
|
Toshihiro Shimizu |
890ddd |
through custom signals to be invoked manually in the run() method, outside the
|
|
Toshihiro Shimizu |
890ddd |
mutex.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b finished and \b exception signals.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::finished(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
The \b finished signal is emitted from working threads once the run()
|
|
Toshihiro Shimizu |
890ddd |
code is returned without unmanaged exceptions.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b started and \b exception signals.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::exception(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
The \b exception signal is emitted from working threads whenever an
|
|
Toshihiro Shimizu |
890ddd |
untrapped exception is found within the run() method.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b started and \b finished signals.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::canceled(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
The \b canceled signal is emitted from the controller thread whenever
|
|
Toshihiro Shimizu |
890ddd |
a task which is currently under the task manager's control is canceled
|
|
Toshihiro Shimizu |
890ddd |
by the user (the signal is emitted from the thread invoking the cancel).
|
|
Toshihiro Shimizu |
890ddd |
Observe that tasks under execution are not stopped by the task manager
|
|
Toshihiro Shimizu |
890ddd |
when they are canceled, but the signal is emitted anyway - helping the
|
|
Toshihiro Shimizu |
890ddd |
user to stop the actual execution of the run() code in advance.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b Executor::removeTask and \b Executor::cancelAll methods.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::terminated(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
The \b terminated signal is emitted from the controller thread when
|
|
Toshihiro Shimizu |
890ddd |
the Executor components are shutting down inside a call to
|
|
Toshihiro Shimizu |
890ddd |
Executor::shutdown(). Implementing a slot connected to this signal
|
|
Toshihiro Shimizu |
890ddd |
helps the user in controlling the flow of an Executor-multithreaded
|
|
Toshihiro Shimizu |
890ddd |
application when it is shutting down - for example, it can be imposed
|
|
Toshihiro Shimizu |
890ddd |
that the application must wait for the task to be finished, print logs
|
|
Toshihiro Shimizu |
890ddd |
or similar.
|
|
Toshihiro Shimizu |
890ddd |
This signal is always preceded by a canceled() signal, informing all
|
|
Toshihiro Shimizu |
890ddd |
active tasks that thay should begin quitting on their own in a 'soft'
|
|
Toshihiro Shimizu |
890ddd |
way before brute termination may occur.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b Executor::shutdown static method and \b Runnable::canceled signal.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Convenience slot for the started() signal - so it's not necessary to declare
|
|
Toshihiro Shimizu |
890ddd |
//! the task in a header file for moc'ing each time. You must both reimplement
|
|
Toshihiro Shimizu |
890ddd |
//! \b and connect it to the started() signal to make it work.
|
|
Shinya Kitaoka |
120a6e |
void Runnable::onStarted(RunnableP) {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The analogous of onStarted() for the finished() signal.
|
|
Shinya Kitaoka |
120a6e |
void Runnable::onFinished(RunnableP) {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The analogous of onStarted() for the exception() signal.
|
|
Shinya Kitaoka |
120a6e |
void Runnable::onException(RunnableP) {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The analogous of onStarted() for the canceled() signal.
|
|
Shinya Kitaoka |
120a6e |
void Runnable::onCanceled(RunnableP) {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The analogous of onStarted() for the terminated() signal.
|
|
Shinya Kitaoka |
120a6e |
void Runnable::onTerminated(RunnableP) {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==========================
|
|
Toshihiro Shimizu |
890ddd |
// ExecutorId methods
|
|
Toshihiro Shimizu |
890ddd |
//--------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorId::ExecutorId()
|
|
Shinya Kitaoka |
120a6e |
: m_activeTasks(0)
|
|
Shinya Kitaoka |
120a6e |
, m_maxActiveTasks(1)
|
|
Shinya Kitaoka |
120a6e |
, m_activeLoad(0)
|
|
Shinya Kitaoka |
120a6e |
, m_maxActiveLoad((std::numeric_limits<int>::max)())</int>
|
|
Shinya Kitaoka |
120a6e |
, m_dedicatedThreads(false)
|
|
Shinya Kitaoka |
120a6e |
, m_persistentThreads(false) {
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
m_id = globalImp->m_executorIdPool.acquire();
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_waitingFlagsPool.resize(globalImp->m_executorIdPool.size());
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
ExecutorId::~ExecutorId() {
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
if (m_dedicatedThreads) {
|
|
Shinya Kitaoka |
120a6e |
m_persistentThreads = 0;
|
|
Shinya Kitaoka |
120a6e |
refreshDedicatedList();
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_executorIdPool.release(m_id);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Make sure that sleeping workers are eliminated properly if the permanent
|
|
Shinya Kitaoka |
120a6e |
// workers count decreases.
|
|
Shinya Kitaoka |
120a6e |
void ExecutorId::refreshDedicatedList() {
|
|
Shinya Kitaoka |
120a6e |
// QMutexLocker transitionLocker(&globalImp->m_transitionMutex); //Already
|
|
Shinya Kitaoka |
120a6e |
// covered
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
if (!m_dedicatedThreads || !m_persistentThreads) {
|
|
Shinya Kitaoka |
120a6e |
// Release all sleeping workers
|
|
Shinya Kitaoka |
120a6e |
// Wake them - they will exit on their own
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
unsigned int i, size = m_sleepings.size();
|
|
Shinya Kitaoka |
120a6e |
for (i = 0; i < size; ++i) {
|
|
Shinya Kitaoka |
120a6e |
m_sleepings[i]->m_exit = true;
|
|
Shinya Kitaoka |
120a6e |
m_sleepings[i]->m_waitCondition.wakeOne();
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
m_sleepings.clear();
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Worker methods
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
Worker::Worker() : QThread(), m_task(0), m_master(0), m_exit(true) {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
Worker::~Worker() {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void Worker::run() {
|
|
Shinya Kitaoka |
120a6e |
// Ensure atomicity of worker's state transitions
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker sl(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
if (shutdownVar) return;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
for (;;) {
|
|
Shinya Kitaoka |
120a6e |
// Run the taken task
|
|
Shinya Kitaoka |
120a6e |
setPriority(m_task->runningPriority());
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
try {
|
|
Shinya Kitaoka |
120a6e |
Q_EMIT m_task->started(m_task);
|
|
Shinya Kitaoka |
120a6e |
sl.unlock();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
m_task->run();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
sl.relock();
|
|
Shinya Kitaoka |
120a6e |
Q_EMIT m_task->finished(m_task);
|
|
Shinya Kitaoka |
120a6e |
} catch (...) {
|
|
Shinya Kitaoka |
120a6e |
sl.relock(); // throw must be in the run() block
|
|
Shinya Kitaoka |
120a6e |
Q_EMIT m_task->exception(m_task);
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
updateCountsOnRelease();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
if (shutdownVar) return;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Get the next task
|
|
Shinya Kitaoka |
120a6e |
takeTask();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
if (!m_task) {
|
|
Shinya Kitaoka |
120a6e |
onFinish();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
if (!m_exit && !shutdownVar) {
|
|
Shinya Kitaoka |
120a6e |
// Put the worker to sleep
|
|
Shinya Kitaoka |
120a6e |
m_waitCondition.wait(sl.mutex());
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Upon thread destruction the wait condition is implicitly woken up.
|
|
Shinya Kitaoka |
120a6e |
// If this is the case, m_task == 0 and we return.
|
|
Shinya Kitaoka |
120a6e |
if (!m_task || shutdownVar) return;
|
|
Shinya Kitaoka |
120a6e |
} else
|
|
Shinya Kitaoka |
120a6e |
return;
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void Worker::updateCountsOnTake() {
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_activeLoad += m_task->m_load;
|
|
Shinya Kitaoka |
120a6e |
m_task->m_id->m_activeLoad += m_task->m_load;
|
|
Shinya Kitaoka |
120a6e |
++m_task->m_id->m_activeTasks;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void Worker::updateCountsOnRelease() {
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_activeLoad -= m_task->m_load;
|
|
Shinya Kitaoka |
120a6e |
m_task->m_id->m_activeLoad -= m_task->m_load;
|
|
Shinya Kitaoka |
120a6e |
--m_task->m_id->m_activeTasks;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void Worker::onFinish() {
|
|
Shinya Kitaoka |
120a6e |
if (m_master && m_master->m_dedicatedThreads &&
|
|
Shinya Kitaoka |
120a6e |
m_master->m_persistentThreads) {
|
|
Shinya Kitaoka |
120a6e |
m_exit = false;
|
|
Shinya Kitaoka |
120a6e |
m_master->m_sleepings.push_back(this);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Unlock the mutex - since eventually invoked ~ExecutorId will relock it...
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_transitionMutex.unlock();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
m_master =
|
|
Shinya Kitaoka |
120a6e |
0; // Master may be destroyed here - and m_exit= true for all sleepings
|
|
Shinya Kitaoka |
120a6e |
// in that case
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_transitionMutex.lock();
|
|
Shinya Kitaoka |
120a6e |
} else {
|
|
Shinya Kitaoka |
120a6e |
m_exit = true;
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_workers.erase(this);
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Executor methods
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
Executor::Executor() : m_id(new ExecutorId) { m_id->addRef(); }
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
Executor::~Executor() { m_id->release(); }
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! This static method declares the use of the Executor's task manager into
|
|
Toshihiro Shimizu |
890ddd |
//! the application code. Be sure to use it according to the following rules:
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
//! Only QCoreApplications or QApplications may use Executors.
|
|
Shinya Kitaoka |
120a6e |
//! This method must be invoked in a thread which performs constant Qt
|
|
Shinya Kitaoka |
120a6e |
//! event
|
|
Toshihiro Shimizu |
890ddd |
//! processing - like the main loop of interactive GUI applications.
|
|
Toshihiro Shimizu |
890ddd |
//! No task processing is allowed after event processing stops.
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Shinya Kitaoka |
120a6e |
void Executor::init() {
|
|
Shinya Kitaoka |
120a6e |
// If no global ExecutorImp exists, allocate it now. You may not move this
|
|
Shinya Kitaoka |
120a6e |
// to a static declaration, since ExecutorImpSlots's connections must be
|
|
Shinya Kitaoka |
120a6e |
// made once the QCoreApplication has been constructed.
|
|
Shinya Kitaoka |
120a6e |
if (!globalImp) {
|
|
Shinya Kitaoka |
120a6e |
globalImp = new ExecutorImp;
|
|
Shinya Kitaoka |
120a6e |
globalImpSlots = new ExecutorImpSlots;
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
qRegisterMetaType<tthread::runnablep>("TThread::RunnableP");</tthread::runnablep>
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
//! This static method, which \b must be invoked in the controller thread,
|
|
Shinya Kitaoka |
120a6e |
//! declares
|
|
Shinya Kitaoka |
120a6e |
//! termination of all Executor-based components, forcing the execution of tasks
|
|
Shinya Kitaoka |
120a6e |
//! submitted
|
|
Toshihiro Shimizu |
890ddd |
//! by any Executor to quit as soon as possible in a safe way.
|
|
Shinya Kitaoka |
120a6e |
//! When the shutdown method is invoked, the task manager first emits a
|
|
Shinya Kitaoka |
120a6e |
//! canceled()
|
|
Shinya Kitaoka |
120a6e |
//! signal for all the tasks that were submitted to it, independently from the
|
|
Shinya Kitaoka |
120a6e |
//! Executor that
|
|
Shinya Kitaoka |
120a6e |
//! performed the submission; then, tasks that are still active once all the
|
|
Shinya Kitaoka |
120a6e |
//! cancellation signals
|
|
Shinya Kitaoka |
120a6e |
//! were delivered further receive a terminated() signal informing them that
|
|
Shinya Kitaoka |
120a6e |
//! they must provide
|
|
Shinya Kitaoka |
120a6e |
//! code termination (or at least remain silent in a safe state until the
|
|
Shinya Kitaoka |
120a6e |
//! application quits).
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
//! \b NOTE: Observe that this method does not explicitly wait for all the tasks
|
|
Shinya Kitaoka |
120a6e |
//! to terminate - this depends
|
|
Shinya Kitaoka |
120a6e |
//! on the code connected to the terminated() signal and is under the user's
|
|
Shinya Kitaoka |
120a6e |
//! responsibility (see the
|
|
Shinya Kitaoka |
120a6e |
//! remarks specified in started() signal descritpion); if this is the intent
|
|
Shinya Kitaoka |
120a6e |
//! and the terminated slot
|
|
Shinya Kitaoka |
120a6e |
//! is invoked in the controller thread, you should remember to implement a
|
|
Shinya Kitaoka |
120a6e |
//! local event loop in it (so that
|
|
Shinya Kitaoka |
120a6e |
//! event processing is still performed) and wait there until the first
|
|
Shinya Kitaoka |
120a6e |
//! finished() or catched()
|
|
Toshihiro Shimizu |
890ddd |
//! slot make it quit.
|
|
Shinya Kitaoka |
120a6e |
void Executor::shutdown() {
|
|
Shinya Kitaoka |
120a6e |
{
|
|
Shinya Kitaoka |
120a6e |
// Updating tasks list - lock against state transitions
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
shutdownVar = true;
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Cancel all tasks - first the active ones
|
|
Shinya Kitaoka |
120a6e |
std::set<worker *="">::iterator it;</worker>
|
|
Shinya Kitaoka |
120a6e |
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end();
|
|
Shinya Kitaoka |
120a6e |
++it) {
|
|
Shinya Kitaoka |
120a6e |
RunnableP task = (*it)->m_task;
|
|
Shinya Kitaoka |
120a6e |
if (task) Q_EMIT task->canceled(task);
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Finally, deal with the global queue tasks
|
|
Shinya Kitaoka |
120a6e |
QMutableMapIterator<int, runnablep=""> jt(globalImp->m_tasks);</int,>
|
|
Shinya Kitaoka |
120a6e |
while (jt.hasNext()) {
|
|
Shinya Kitaoka |
120a6e |
jt.next();
|
|
Shinya Kitaoka |
120a6e |
RunnableP task = jt.value();
|
|
Shinya Kitaoka |
120a6e |
Q_EMIT task->canceled(task);
|
|
Shinya Kitaoka |
120a6e |
jt.remove();
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Now, send the terminate() signal to all active tasks
|
|
Shinya Kitaoka |
120a6e |
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end();
|
|
Shinya Kitaoka |
120a6e |
++it) {
|
|
Shinya Kitaoka |
120a6e |
RunnableP task = (*it)->m_task;
|
|
Shinya Kitaoka |
120a6e |
if (task) Q_EMIT task->terminated(task);
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Just placing a convenience processEvents() to make sure that queued slots
|
|
Shinya Kitaoka |
120a6e |
// invoked by the
|
|
Shinya Kitaoka |
120a6e |
// signals above are effectively invoked in this method - without having to
|
|
Shinya Kitaoka |
120a6e |
// return to an event loop.
|
|
Shinya Kitaoka |
120a6e |
QCoreApplication::processEvents();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Specifies the use of dedicated threads for the Executor's task group.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! By default a worker thread attempts adoption of Runnable tasks
|
|
Toshihiro Shimizu |
890ddd |
//! without regard to the Executor that performed the submission. This helps
|
|
Toshihiro Shimizu |
890ddd |
//! in stabilizing the number of threads that are created and destroyed
|
|
Toshihiro Shimizu |
890ddd |
//! by the task manager - but may be a problem in some cases.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Using this method the user can explicitly tell the Executor to seize the
|
|
Toshihiro Shimizu |
890ddd |
//! ownership of worker threads assigned to its tasks, so that they will not
|
|
Toshihiro Shimizu |
890ddd |
//! try adoption of external tasks but instead remain focused on Executor's
|
|
Toshihiro Shimizu |
890ddd |
//! tasks only.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! An optional \b persistent parameter may be passed, which specifies if
|
|
Toshihiro Shimizu |
890ddd |
//! dedicated threads should remain sleeping or should rather die when no
|
|
Toshihiro Shimizu |
890ddd |
//! processable tasks from the Executor are found.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! This method is especially helpful in two occasions:
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
//! The Executor's tasks use thread-specific data such as QThreadStorages,
|
|
Toshihiro Shimizu |
890ddd |
//! which may be recycled among different tasks.
|
|
Shinya Kitaoka |
120a6e |
//! The Executor receives tasks at a frequent rate, but mostly ends each
|
|
Shinya Kitaoka |
120a6e |
//! one before
|
|
Toshihiro Shimizu |
890ddd |
//! another one is submitted - resulting in a continuous thread turnover.
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void Executor::setDedicatedThreads(bool dedicated, bool persistent) {
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
m_id->m_dedicatedThreads = dedicated;
|
|
Shinya Kitaoka |
120a6e |
m_id->m_persistentThreads = persistent;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
m_id->refreshDedicatedList();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Submits a task for execution. The task is executed according to
|
|
Toshihiro Shimizu |
890ddd |
//! its task load, insertion time and scheduling priority.
|
|
Shinya Kitaoka |
120a6e |
void Executor::addTask(RunnableP task) {
|
|
Shinya Kitaoka |
120a6e |
{
|
|
Shinya Kitaoka |
120a6e |
if (task->m_id) // Must be done outside transition lock, since eventually
|
|
Shinya Kitaoka |
120a6e |
task->m_id->release(); // invoked ~ExecutorId will lock it
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Updating tasks and workers list - lock against state transitions
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
task->m_id = m_id;
|
|
Shinya Kitaoka |
120a6e |
m_id->addRef();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
globalImp->insertTask(task->schedulingPriority(), task);
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// If addTask is called in the main thread, the emit works directly -
|
|
Shinya Kitaoka |
120a6e |
// so it is necessary to unlock the mutex *before* emitting the refresh.
|
|
Shinya Kitaoka |
120a6e |
globalImpSlots->emitRefreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Removes the given task from scheduled execution and emits its
|
|
Toshihiro Shimizu |
890ddd |
//! Runnable::canceled signal. Tasks already under execution are not
|
|
Toshihiro Shimizu |
890ddd |
//! stopped by this method - although the canceled signal is still emitted.
|
|
Shinya Kitaoka |
120a6e |
//! It has no effect if the task is not currently under the task manager's
|
|
Shinya Kitaoka |
120a6e |
//! control.
|
|
Toshihiro Shimizu |
890ddd |
//! \sa \b Runnable::canceled signal and the \b cancelAll method.
|
|
Shinya Kitaoka |
120a6e |
void Executor::removeTask(RunnableP task) {
|
|
Shinya Kitaoka |
120a6e |
// If the task does not belong to this Executor, quit.
|
|
Shinya Kitaoka |
120a6e |
if (task->m_id != m_id) return;
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Updating tasks list - lock against state transitions
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Then, look in the global queue - if it is found, emiminate the task and
|
|
Shinya Kitaoka |
120a6e |
// send the canceled signal.
|
|
Shinya Kitaoka |
120a6e |
if (globalImp->m_tasks.remove(task->m_schedulingPriority, task)) {
|
|
Shinya Kitaoka |
120a6e |
Q_EMIT task->canceled(task);
|
|
Shinya Kitaoka |
120a6e |
return;
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Finally, the task may be running - look in workers.
|
|
Shinya Kitaoka |
120a6e |
std::set<worker *=""> &workers = globalImp->m_workers;</worker>
|
|
Shinya Kitaoka |
120a6e |
std::set<worker *="">::iterator it;</worker>
|
|
Shinya Kitaoka |
120a6e |
for (it = workers.begin(); it != workers.end(); ++it)
|
|
Shinya Kitaoka |
120a6e |
if (task && (*it)->m_task == task) Q_EMIT task->canceled(task);
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// No need to refresh - tasks were eventually decremented...
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Clears the task manager of all tasks added by this Executor and emits
|
|
Toshihiro Shimizu |
890ddd |
//! the Runnable::canceled signal for each of them. The same specifications
|
|
Toshihiro Shimizu |
890ddd |
//! described in the \b removeTask method apply here.
|
|
Toshihiro Shimizu |
890ddd |
//! \sa \b Runnable::canceled signal and the \b removeTask method.
|
|
Shinya Kitaoka |
120a6e |
void Executor::cancelAll() {
|
|
Shinya Kitaoka |
120a6e |
// Updating tasks list - lock against state transitions
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Clear the tasks chronologically. So, first check currently working
|
|
Shinya Kitaoka |
120a6e |
// tasks.
|
|
Shinya Kitaoka |
120a6e |
std::set<worker *="">::iterator it;</worker>
|
|
Shinya Kitaoka |
120a6e |
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end();
|
|
Shinya Kitaoka |
120a6e |
++it) {
|
|
Shinya Kitaoka |
120a6e |
RunnableP task = (*it)->m_task;
|
|
Shinya Kitaoka |
120a6e |
if (task && task->m_id == m_id) Q_EMIT task->canceled(task);
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Finally, clear the global tasks list from all tasks inserted by this
|
|
Shinya Kitaoka |
120a6e |
// executor
|
|
Shinya Kitaoka |
120a6e |
// NOTE: An easier way here?
|
|
Shinya Kitaoka |
120a6e |
QMutableMapIterator<int, runnablep=""> jt(globalImp->m_tasks);</int,>
|
|
Shinya Kitaoka |
120a6e |
while (jt.hasNext()) {
|
|
Shinya Kitaoka |
120a6e |
jt.next();
|
|
Shinya Kitaoka |
120a6e |
if (jt.value()->m_id == m_id) {
|
|
Shinya Kitaoka |
120a6e |
RunnableP task = jt.value();
|
|
Shinya Kitaoka |
120a6e |
Q_EMIT task->canceled(task);
|
|
Shinya Kitaoka |
120a6e |
jt.remove();
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Declares that only a certain number of tasks added by this Executor
|
|
Toshihiro Shimizu |
890ddd |
//! may be processed simultaneously. The default is 1 - meaning that tasks
|
|
Toshihiro Shimizu |
890ddd |
//! added to the executor are completely serialized. A negative task number
|
|
Toshihiro Shimizu |
890ddd |
//! disables any form of task serialization.
|
|
Toshihiro Shimizu |
890ddd |
//! \b NOTE: Currently, tasks that do not
|
|
Toshihiro Shimizu |
890ddd |
//! satisfy this condition avoid blocking execution of tasks not
|
|
Toshihiro Shimizu |
890ddd |
//! added by the same Executor - even if they were scheduled for later
|
|
Toshihiro Shimizu |
890ddd |
//! execution.
|
|
Shinya Kitaoka |
120a6e |
void Executor::setMaxActiveTasks(int maxActiveTasks) {
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
if (maxActiveTasks <= 0)
|
|
Shinya Kitaoka |
120a6e |
m_id->m_maxActiveTasks = (std::numeric_limits<int>::max)();</int>
|
|
Shinya Kitaoka |
120a6e |
else
|
|
Shinya Kitaoka |
120a6e |
m_id->m_maxActiveTasks = maxActiveTasks;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
int Executor::maxActiveTasks() const {
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Shinya Kitaoka |
120a6e |
return m_id->m_maxActiveTasks;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Declares a maximum overall task load for the tasks added by this Executor.
|
|
Toshihiro Shimizu |
890ddd |
//! \b NOTE: The same remark for setMaxActiveTasks() holds here.
|
|
Shinya Kitaoka |
120a6e |
void Executor::setMaxActiveLoad(int maxActiveLoad) {
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
m_id->m_maxActiveLoad = maxActiveLoad;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
int Executor::maxActiveLoad() const {
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Shinya Kitaoka |
120a6e |
return m_id->m_maxActiveLoad;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==================================
|
|
Toshihiro Shimizu |
890ddd |
// ExecutorImpSlots methods
|
|
Toshihiro Shimizu |
890ddd |
//----------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
ExecutorImpSlots::ExecutorImpSlots() {
|
|
Shinya Kitaoka |
120a6e |
connect(this, SIGNAL(refreshAssignments()), this,
|
|
Shinya Kitaoka |
120a6e |
SLOT(onRefreshAssignments()));
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
ExecutorImpSlots::~ExecutorImpSlots() {}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void ExecutorImpSlots::emitRefreshAssignments() { Q_EMIT refreshAssignments(); }
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void ExecutorImpSlots::onRefreshAssignments() {
|
|
Shinya Kitaoka |
120a6e |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
globalImp->refreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void ExecutorImpSlots::onTerminated() { delete QObject::sender(); }
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
// Task adoption methods
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void ExecutorId::newWorker(RunnableP &task) {
|
|
Shinya Kitaoka |
120a6e |
Worker *worker;
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
if (m_sleepings.size()) {
|
|
Shinya Kitaoka |
120a6e |
worker = m_sleepings.front();
|
|
Shinya Kitaoka |
120a6e |
m_sleepings.pop_front();
|
|
Shinya Kitaoka |
120a6e |
worker->m_task = task;
|
|
Shinya Kitaoka |
120a6e |
worker->updateCountsOnTake();
|
|
Shinya Kitaoka |
120a6e |
worker->m_waitCondition.wakeOne();
|
|
Shinya Kitaoka |
120a6e |
} else {
|
|
Shinya Kitaoka |
120a6e |
worker = new Worker;
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_workers.insert(worker);
|
|
Shinya Kitaoka |
120a6e |
QObject::connect(worker, SIGNAL(finished()), globalImpSlots,
|
|
Shinya Kitaoka |
120a6e |
SLOT(onTerminated()));
|
|
Shinya Kitaoka |
120a6e |
worker->m_task = task;
|
|
Shinya Kitaoka |
120a6e |
worker->updateCountsOnTake();
|
|
Shinya Kitaoka |
120a6e |
worker->start();
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline void Worker::adoptTask(RunnableP &task) {
|
|
Shinya Kitaoka |
120a6e |
m_task = task;
|
|
Shinya Kitaoka |
120a6e |
updateCountsOnTake();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Le task timedOut sono ex-accumulate che non soddisfano le condizioni
|
|
Toshihiro Shimizu |
890ddd |
// standard. Quindi sono bloccanti *ovunque*.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Il refresh dei worker sulle task accumulate *deve* avvenire:
|
|
Toshihiro Shimizu |
890ddd |
// -Se e solo se una task dello stesso executor finisce,
|
|
Toshihiro Shimizu |
890ddd |
// perche' e' l'unico caso in cui le custom conditions
|
|
Toshihiro Shimizu |
890ddd |
// vengono aggiornate
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Se un thread dedicato non puo' prendere una task estranea, non e' detto
|
|
Toshihiro Shimizu |
890ddd |
// che altri non possano!
|
|
Toshihiro Shimizu |
890ddd |
// * Le task che richiedono dedizione dovrebbero essere adottate da thread
|
|
Toshihiro Shimizu |
890ddd |
// gia' dedicati, se esistono! => Gli executor che richiedono thread dedicati
|
|
Toshihiro Shimizu |
890ddd |
// li creano a parte e non li condividono con nessuno: cioe', thread creati
|
|
Toshihiro Shimizu |
890ddd |
// senza dedizione non devono poter adottare task richiedenti dedizione!
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Assigns tasks polled from the id's accumulation queue (if id is given) and
|
|
Shinya Kitaoka |
120a6e |
// the global tasks queue.
|
|
Shinya Kitaoka |
120a6e |
// It works like:
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// a) First look if there exist tasks with timedOut priority and if so
|
|
Toshihiro Shimizu |
890ddd |
// try to take them out
|
|
Toshihiro Shimizu |
890ddd |
// b) Then look for tasks in the id's accumulation queue
|
|
Toshihiro Shimizu |
890ddd |
// c) Finally search in the remaining global tasks queue
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
void ExecutorImp::refreshAssignments() {
|
|
Shinya Kitaoka |
120a6e |
// QMutexLocker transitionLocker(&globalImp->m_transitionMutex); //Already
|
|
Shinya Kitaoka |
120a6e |
// covered
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
if (m_tasks.isEmpty()) return;
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// Erase the id vector data
|
|
Shinya Kitaoka |
120a6e |
assert(m_executorIdPool.size() == m_waitingFlagsPool.size());
|
|
Shinya Kitaoka |
120a6e |
memset(&m_waitingFlagsPool.front(), 0, m_waitingFlagsPool.size());
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
// c) Try with the global queue
|
|
Shinya Kitaoka |
120a6e |
int e, executorsCount = m_executorIdPool.acquiredSize();
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
int i, tasksCount = m_tasks.size();
|
|
Shinya Kitaoka |
120a6e |
QMultiMap<int, runnablep="">::iterator it;</int,>
|
|
Shinya Kitaoka |
120a6e |
for (i = 0, e = 0, it = m_tasks.end() - 1;
|
|
Shinya Kitaoka |
120a6e |
i < tasksCount && e < executorsCount; ++i, --it) {
|
|
Shinya Kitaoka |
120a6e |
// std::cout<< "global tasks-refreshAss" << std::endl;
|
|
Shinya Kitaoka |
120a6e |
// Take the task
|
|
Shinya Kitaoka |
120a6e |
RunnableP task = it.value();
|
|
Shinya Kitaoka |
120a6e |
task->m_load = task->taskLoad();
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
UCHAR &idWaitingForAnotherTask = m_waitingFlagsPool[task->m_id->m_id];
|
|
Shinya Kitaoka |
120a6e |
if (idWaitingForAnotherTask) continue;
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
if (!isExecutable(task)) break;
|
|
Shinya Kitaoka |
120a6e |
|
|
Shinya Kitaoka |
120a6e |
if (!task->customConditions()) {
|
|
Shinya Kitaoka |
120a6e |
++e;
|
|
Shinya Kitaoka |
120a6e |
idWaitingForAnotherTask = 1;
|
|
Shinya Kitaoka |
120a6e |
} else {
|
|
Shinya Kitaoka |
120a6e |
task->m_id->newWorker(task);
|
|
Shinya Kitaoka |
120a6e |
it = m_tasks.erase(it);
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
inline bool Worker::canAdopt(const RunnableP &task) {
|
|
Shinya Kitaoka |
120a6e |
return task->m_id->m_sleepings.size() ==
|
|
Shinya Kitaoka |
120a6e |
0 && // Always prefer sleeping dedicateds if present
|
|
Shinya Kitaoka |
120a6e |
(!m_master || (m_master.getPointer() == task->m_id)); // If was seized
|
|
Shinya Kitaoka |
120a6e |
// by an
|
|
Shinya Kitaoka |
120a6e |
// Executor,
|
|
Shinya Kitaoka |
120a6e |
// ensure task
|
|
Shinya Kitaoka |
120a6e |
// compatibility
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Takes a task and assigns it to the worker in a way similar to the one above.
|
|
Shinya Kitaoka |
120a6e |
inline void Worker::takeTask() {
|
|
Shinya Kitaoka |
120a6e |
TSmartPointerT<executorid> oldId = m_task->m_id;</executorid>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// When a new task is taken, the old one's Executor may seize the worker
|
|
Shinya Kitaoka |
120a6e |
m_master = oldId->m_dedicatedThreads ? oldId : (TSmartPointerT<executorid>)0;</executorid>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// c) No accumulated task can be taken - look for a task in the global tasks
|
|
Shinya Kitaoka |
120a6e |
// queue.
|
|
Shinya Kitaoka |
120a6e |
// If the active load admits it, take the earliest task.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Free the old task. NOTE: This instruction MUST be performed OUTSIDE the
|
|
Shinya Kitaoka |
120a6e |
// mutex-protected environment -
|
|
Shinya Kitaoka |
120a6e |
// since user code may be executed upon task destruction - including the mutex
|
|
Shinya Kitaoka |
120a6e |
// relock!!
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_transitionMutex.unlock();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
m_task = 0;
|
|
Shinya Kitaoka |
120a6e |
oldId = TSmartPointerT<executorid>();</executorid>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
globalImp->m_transitionMutex.lock();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Erase the executor id status pool
|
|
Shinya Kitaoka |
120a6e |
tcg::indices_pool<> &executorIdPool = globalImp->m_executorIdPool;
|
|
Shinya Kitaoka |
120a6e |
std::vector<uchar> &waitingFlagsPool = globalImp->m_waitingFlagsPool;</uchar>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
assert(waitingFlagsPool.size() == globalImp->m_executorIdPool.size());
|
|
Shinya Kitaoka |
120a6e |
memset(&waitingFlagsPool.front(), 0, waitingFlagsPool.size());
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
int e, executorsCount = executorIdPool.acquiredSize();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
int i, tasksCount = globalImp->m_tasks.size();
|
|
Shinya Kitaoka |
120a6e |
QMultiMap<int, runnablep="">::iterator it;</int,>
|
|
Shinya Kitaoka |
120a6e |
for (i = 0, e = 0, it = globalImp->m_tasks.end() - 1;
|
|
Shinya Kitaoka |
120a6e |
i < tasksCount && e < executorsCount; ++i, --it) {
|
|
Shinya Kitaoka |
120a6e |
// std::cout<< "global tasks-takeTask" << std::endl;
|
|
Shinya Kitaoka |
120a6e |
// Take the first task
|
|
Shinya Kitaoka |
120a6e |
RunnableP task = it.value();
|
|
Shinya Kitaoka |
120a6e |
task->m_load = task->taskLoad();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
UCHAR &idWaitingForAnotherTask = waitingFlagsPool[task->m_id->m_id];
|
|
Shinya Kitaoka |
120a6e |
if (idWaitingForAnotherTask) continue;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
if (!globalImp->isExecutable(task)) break;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// In case the worker was captured for dedication, check the task
|
|
Shinya Kitaoka |
120a6e |
// compatibility.
|
|
Shinya Kitaoka |
120a6e |
if (!canAdopt(task)) {
|
|
Shinya Kitaoka |
120a6e |
// some other worker may still take the task...
|
|
Shinya Kitaoka |
120a6e |
globalImpSlots->emitRefreshAssignments();
|
|
Shinya Kitaoka |
120a6e |
break;
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
// Test its custom conditions
|
|
Shinya Kitaoka |
120a6e |
if (!task->customConditions()) {
|
|
Shinya Kitaoka |
120a6e |
++e;
|
|
Shinya Kitaoka |
120a6e |
idWaitingForAnotherTask = 1;
|
|
Shinya Kitaoka |
120a6e |
} else {
|
|
Shinya Kitaoka |
120a6e |
adoptTask(task);
|
|
Shinya Kitaoka |
120a6e |
it = globalImp->m_tasks.erase(it);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Shinya Kitaoka |
120a6e |
globalImpSlots->emitRefreshAssignments();
|
|
Shinya Kitaoka |
120a6e |
break;
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Shinya Kitaoka |
120a6e |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|