|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
#ifdef _DEBUG
|
|
Toshihiro Shimizu |
890ddd |
#define _STLP_DEBUG 1
|
|
Toshihiro Shimizu |
890ddd |
#endif
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// TnzCore includes
|
|
Toshihiro Shimizu |
890ddd |
#include "tthreadmessage.h"
|
|
Toshihiro Shimizu |
890ddd |
#include "tsystem.h"
|
|
Toshihiro Shimizu |
890ddd |
#include "tatomicvar.h"
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
#include "tthread.h"
|
|
Toshihiro Shimizu |
890ddd |
#include "tthreadP.h"
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// STL includes
|
|
Toshihiro Shimizu |
890ddd |
#include <set></set>
|
|
Toshihiro Shimizu |
890ddd |
#include <deque></deque>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// tcg includes
|
|
Toshihiro Shimizu |
890ddd |
#include "tcg/tcg_pool.h"
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Qt includes
|
|
Toshihiro Shimizu |
890ddd |
#include <qmultimap></qmultimap>
|
|
Toshihiro Shimizu |
890ddd |
#include <qmutablemapiterator></qmutablemapiterator>
|
|
Toshihiro Shimizu |
890ddd |
#include <qmutex></qmutex>
|
|
Toshihiro Shimizu |
890ddd |
#include <qwaitcondition></qwaitcondition>
|
|
Toshihiro Shimizu |
890ddd |
#include <qmetatype></qmetatype>
|
|
Toshihiro Shimizu |
890ddd |
#include <qcoreapplication></qcoreapplication>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==================================================
|
|
Toshihiro Shimizu |
890ddd |
// Paradigms of the Executor tasks management
|
|
Toshihiro Shimizu |
890ddd |
//--------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Basics:
|
|
Toshihiro Shimizu |
890ddd |
// * Tasks added by Executors are always stored in a global QMultiMap first -
|
|
Toshihiro Shimizu |
890ddd |
// ordering primarily being the schedulingPriority(), and insertion instant
|
|
Toshihiro Shimizu |
890ddd |
// (implicit) when they have the same scheduling priority.
|
|
Toshihiro Shimizu |
890ddd |
// * The QMultiMap needs to be reverse-iterated to do that, since values with
|
|
Toshihiro Shimizu |
890ddd |
// the same key are ordered from the most recent to the oldest one (see Qt's
|
|
Toshihiro Shimizu |
890ddd |
// manual).
|
|
Toshihiro Shimizu |
890ddd |
// * Worker threads are stored in a global set.
|
|
Toshihiro Shimizu |
890ddd |
// * When a task is added or a task has been performed, the workers list is
|
|
Toshihiro Shimizu |
890ddd |
// refreshed, possibly adding new Workers for some executable tasks.
|
|
Toshihiro Shimizu |
890ddd |
// * When a worker ends a task, it automatically takes a new one before refreshing
|
|
Toshihiro Shimizu |
890ddd |
// the workers list. If no task can be taken, by default the thread exits and
|
|
Toshihiro Shimizu |
890ddd |
// invokes its own destruction.
|
|
Toshihiro Shimizu |
890ddd |
// * The thread may instead be put to rest if explicitly told by the user with
|
|
Toshihiro Shimizu |
890ddd |
// the appropriate method.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Default execution conditions:
|
|
Toshihiro Shimizu |
890ddd |
// * A task is executable if, by default, its task load added to the sum of that of
|
|
Toshihiro Shimizu |
890ddd |
// all other active tasks does not exceed the available resources of the system
|
|
Toshihiro Shimizu |
890ddd |
// (i.e.: 100 * # of cores of the machine).
|
|
Toshihiro Shimizu |
890ddd |
// In other words, in every instant of execution, the sum of all active task's loads
|
|
Toshihiro Shimizu |
890ddd |
// never exceeds the available machine resources.
|
|
Toshihiro Shimizu |
890ddd |
// * When such default execution condition is not met when attempting to take the
|
|
Toshihiro Shimizu |
890ddd |
// task, no other task is taken instead - we wait until enough resources have been
|
|
Toshihiro Shimizu |
890ddd |
// freed before attempting to take the same task again.
|
|
Toshihiro Shimizu |
890ddd |
// In other words, the default execution condition is *BLOCKING*.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Custom execution conditions:
|
|
Toshihiro Shimizu |
890ddd |
// * The user may decide to impose more tight conditions for tasks added by a certain
|
|
Toshihiro Shimizu |
890ddd |
// Executor. Let's call such conditions 'custom' conditions.
|
|
Toshihiro Shimizu |
890ddd |
// * Custom conditions are always tested *AFTER* the default ones (on the same task).
|
|
Toshihiro Shimizu |
890ddd |
// This is necessary to enforce the global scheduling priorities mechanism.
|
|
Toshihiro Shimizu |
890ddd |
// * When no task of a certain executor is active, custom conditions are always considered
|
|
Toshihiro Shimizu |
890ddd |
// satisfied.
|
|
Toshihiro Shimizu |
890ddd |
// * If custom conditions are not met, we enqueue the task in an Executor-private queue
|
|
Toshihiro Shimizu |
890ddd |
// for later execution and remove it from the global tasks queue - making it possible
|
|
Toshihiro Shimizu |
890ddd |
// to perform other tasks before our custom-failed one (such operation will be called
|
|
Toshihiro Shimizu |
890ddd |
// "ACCUMULATION").
|
|
Toshihiro Shimizu |
890ddd |
// In other words, custom conditions are *NOT BLOCKING* among different Executors.
|
|
Toshihiro Shimizu |
890ddd |
// * Tasks in the last task's Executor-private queue are always polled before those in
|
|
Toshihiro Shimizu |
890ddd |
// the global queue by Workers which ended a task, and this is done in a *BLOCKING* way
|
|
Toshihiro Shimizu |
890ddd |
// inside the same Executor.
|
|
Toshihiro Shimizu |
890ddd |
// * When an Executor-private tasks queue is not empty, all the other tasks added by the
|
|
Toshihiro Shimizu |
890ddd |
// same executor are scheduled in the queue.
|
|
Toshihiro Shimizu |
890ddd |
// In other words, the order of execution is always that of global insertion, *inside
|
|
Toshihiro Shimizu |
890ddd |
// the same executor*.
|
|
Toshihiro Shimizu |
890ddd |
// * Tasks polled from an Executor-private queue (which therefore satisfied custom conditions)
|
|
Toshihiro Shimizu |
890ddd |
// may still fail with default execution conditions. If that happens, we put the task
|
|
Toshihiro Shimizu |
890ddd |
// back into the global queue with highest possible priority (timedOut) and the worker dies.
|
|
Toshihiro Shimizu |
890ddd |
// Tasks with this special priority are polled *before every other task*. So, again, default
|
|
Toshihiro Shimizu |
890ddd |
// conditions are *BLOCKING*.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Thread-safety:
|
|
Toshihiro Shimizu |
890ddd |
// * Most of the following code is mutex-protected, altough it might seem not - indeed,
|
|
Toshihiro Shimizu |
890ddd |
// only *one* mutex is locked and unlocked all of the time. This 'transition mutex' is
|
|
Toshihiro Shimizu |
890ddd |
// the key to thread-safety: we're considered to lie in a 'transition state' if we are
|
|
Toshihiro Shimizu |
890ddd |
// operating outside the run() of some task - which covers almost the totality of the code.
|
|
Toshihiro Shimizu |
890ddd |
// * The transition mutex is *not* recursive. The reason is that threads con not wait on
|
|
Toshihiro Shimizu |
890ddd |
// QWaitConditions if the mutex is recursive. That makes it necessary (and welcome) to put
|
|
Toshihiro Shimizu |
890ddd |
// mutex lockers in strategic points of the code - in many low-level functions no mutex
|
|
Toshihiro Shimizu |
890ddd |
// is locked *because some caller already did it before*. If you're modifying the code
|
|
Toshihiro Shimizu |
890ddd |
// always trace back the callers of a function before inserting misleading mutex lockers.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==================
|
|
Toshihiro Shimizu |
890ddd |
// TODO list
|
|
Toshihiro Shimizu |
890ddd |
//------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Improve dedicated threads support: make sure that tasks added to a dedicated
|
|
Toshihiro Shimizu |
890ddd |
// executor are directly moved to the accumulation queue. The setDedicatedThreads()
|
|
Toshihiro Shimizu |
890ddd |
// method must therefore react accordingly.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * It could be possible to implement a dependency-based mechanism...
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Should the hosting thread wait for worker ones upon ExecutorImp destruction??
|
|
Toshihiro Shimizu |
890ddd |
// It could be a problem on some forever-waiting tasks...
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Ricontrolla che con le ultime modifiche gli ExecutorId rimangano quando si passano
|
|
Toshihiro Shimizu |
890ddd |
// i puntatori in takeTask e refreshAss..
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
using namespace TThread;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
DEFINE_CLASS_CODE(TThread::Runnable, 21)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==========================================
|
|
Toshihiro Shimizu |
890ddd |
// Global init() initializer function
|
|
Toshihiro Shimizu |
890ddd |
//------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void TThread::init()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
Executor::init();
|
|
Toshihiro Shimizu |
890ddd |
TThreadMessageDispatcher::init();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void TThread::shutdown()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
Executor::shutdown();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
namespace TThread
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==============================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Worker Thread class
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! A Worker is a specialized QThread that continuously polls Runnable
|
|
Toshihiro Shimizu |
890ddd |
//! tasks from a global execution queue to make them work.
|
|
Toshihiro Shimizu |
890ddd |
class Worker : public QThread
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
public:
|
|
Toshihiro Shimizu |
890ddd |
RunnableP m_task;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
TSmartPointerT<executorid> m_master;</executorid>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
bool m_exit;
|
|
Toshihiro Shimizu |
890ddd |
QWaitCondition m_waitCondition;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
Worker();
|
|
Toshihiro Shimizu |
890ddd |
~Worker();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void run();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void takeTask();
|
|
Toshihiro Shimizu |
890ddd |
inline bool canAdopt(const RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
inline void adoptTask(RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void rest();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void updateCountsOnTake();
|
|
Toshihiro Shimizu |
890ddd |
inline void updateCountsOnRelease();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void onFinish();
|
|
Toshihiro Shimizu |
890ddd |
};
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//========================
|
|
Toshihiro Shimizu |
890ddd |
// ExecutorId class
|
|
Toshihiro Shimizu |
890ddd |
//------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Contains all the executor data that need to persist until all tasks
|
|
Toshihiro Shimizu |
890ddd |
//! added by the executor have been processed. Upon creation of an Executor
|
|
Toshihiro Shimizu |
890ddd |
//! object, it instantiates one ExecutorId for both storing the Executor's data
|
|
Toshihiro Shimizu |
890ddd |
//! sensible to the underlying code of the Executor manager, and to identify all
|
|
Toshihiro Shimizu |
890ddd |
//! tasks added through it - by copying the smart pointer to the id into each
|
|
Toshihiro Shimizu |
890ddd |
//! added task.
|
|
Toshihiro Shimizu |
890ddd |
//! \sa Executor and Runnable class.
|
|
Toshihiro Shimizu |
890ddd |
class ExecutorId : public TSmartObject
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
public:
|
|
Toshihiro Shimizu |
890ddd |
size_t m_id;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
int m_activeTasks;
|
|
Toshihiro Shimizu |
890ddd |
int m_maxActiveTasks;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
int m_activeLoad;
|
|
Toshihiro Shimizu |
890ddd |
int m_maxActiveLoad;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
bool m_dedicatedThreads;
|
|
Toshihiro Shimizu |
890ddd |
bool m_persistentThreads;
|
|
Toshihiro Shimizu |
890ddd |
std::deque<worker *=""> m_sleepings;</worker>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorId();
|
|
Toshihiro Shimizu |
890ddd |
~ExecutorId();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void accumulate(const RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void newWorker(RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
void refreshDedicatedList();
|
|
Toshihiro Shimizu |
890ddd |
};
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Executor::Imp class
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! ExecutorImp both manages the allocation of worker threads for the
|
|
Toshihiro Shimizu |
890ddd |
//! execution of Runnable tasks and centralizes the tasks collection.
|
|
Toshihiro Shimizu |
890ddd |
//! One process only hosts one instance of the ExecutorImp class as a
|
|
Toshihiro Shimizu |
890ddd |
//! a global variable that needs to be allocated in an application-lasting
|
|
Toshihiro Shimizu |
890ddd |
//! and event-looped thread - typically the main thread in GUI applications.
|
|
Toshihiro Shimizu |
890ddd |
class ExecutorImp
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
public:
|
|
Toshihiro Shimizu |
890ddd |
QMultiMap<int, runnablep=""> m_tasks;</int,>
|
|
Toshihiro Shimizu |
890ddd |
std::set<worker *=""> m_workers; // Used just for debugging purposes</worker>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
tcg::indices_pool<> m_executorIdPool;
|
|
Toshihiro Shimizu |
890ddd |
std::vector<uchar> m_waitingFlagsPool;</uchar>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
int m_activeLoad;
|
|
Toshihiro Shimizu |
890ddd |
int m_maxLoad;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
QMutex m_transitionMutex; // Workers' transition mutex
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImp();
|
|
Toshihiro Shimizu |
890ddd |
~ExecutorImp();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void insertTask(int schedulingPriority, RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void refreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline bool isExecutable(RunnableP &task);
|
|
Toshihiro Shimizu |
890ddd |
};
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
} // namespace TThread
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Global variables
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
namespace
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImp *globalImp = 0;
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImpSlots *globalImpSlots = 0;
|
|
Toshihiro Shimizu |
890ddd |
bool shutdownVar = false;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=============================
|
|
Toshihiro Shimizu |
890ddd |
// ExecutorImp methods
|
|
Toshihiro Shimizu |
890ddd |
//-----------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImp::ExecutorImp()
|
|
Toshihiro Shimizu |
890ddd |
: m_activeLoad(0), m_maxLoad(TSystem::getProcessorCount() * 100), m_transitionMutex() //NOTE: We'll wait on this mutex - so it can't be recursive
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImp::~ExecutorImp()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//A task is executable <==> its load allows it. The task load is considered
|
|
Toshihiro Shimizu |
890ddd |
//fixed until another isExecutable() call is made again - in case it may
|
|
Toshihiro Shimizu |
890ddd |
//change in time.
|
|
Toshihiro Shimizu |
890ddd |
inline bool ExecutorImp::isExecutable(RunnableP &task)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
return m_activeLoad + task->m_load <= m_maxLoad;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void ExecutorImp::insertTask(int schedulingPriority, RunnableP &task)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
task->m_schedulingPriority = schedulingPriority;
|
|
Toshihiro Shimizu |
890ddd |
m_tasks.insert(schedulingPriority, task);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//========================
|
|
Toshihiro Shimizu |
890ddd |
// Runnable methods
|
|
Toshihiro Shimizu |
890ddd |
//------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
Runnable::Runnable()
|
|
Toshihiro Shimizu |
890ddd |
: TSmartObject(m_classCode), m_id(0)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
Runnable::~Runnable()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
if (m_id)
|
|
Toshihiro Shimizu |
890ddd |
m_id->release(); //see Executor::addTask()
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Returns the predicted CPU load generated by the task, expressed in percentage
|
|
Toshihiro Shimizu |
890ddd |
//! of core usage (that is, 100 is intended to fully occupy one processing core).
|
|
Toshihiro Shimizu |
890ddd |
//! Appropriate task load calibration is an important step to take when implementing
|
|
Toshihiro Shimizu |
890ddd |
//! a new task; for this purpose, remember some rules to follow:
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
//! In every moment, the task manager ensures that the overall sum of the active
|
|
Toshihiro Shimizu |
890ddd |
//! task's load does not exceed the number of machine's processing cores multiplied
|
|
Toshihiro Shimizu |
890ddd |
//! by 100. This condition is \a blocking with respect to the execution of any other
|
|
Toshihiro Shimizu |
890ddd |
//! task - meaning that when a task is about to be executed the task manager \a waits
|
|
Toshihiro Shimizu |
890ddd |
//! until enough CPU resources are available to make it run.
|
|
Toshihiro Shimizu |
890ddd |
//! In particular, observe that a task's load \b never has to exceed the total CPU
|
|
Toshihiro Shimizu |
890ddd |
//! resources - doing so would surely result in a block of your application. The number
|
|
Toshihiro Shimizu |
890ddd |
//! of available cores can be accessed via the \b TSystem::getProcessorCount() or
|
|
Toshihiro Shimizu |
890ddd |
//! \b QThread::idealThreadCount().
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The task load is considered constant for the duration of the task. Changing its
|
|
Toshihiro Shimizu |
890ddd |
//! value does not affect the task manager in any way once the task has been started.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The default task load is 0, representing a very light task. If the task load
|
|
Toshihiro Shimizu |
890ddd |
//! is 0 the condition at point 1 always succeeds - so the task is always executed when
|
|
Toshihiro Shimizu |
890ddd |
//! encountered. Observe that a long succession of 0 task loads leads to the creation of
|
|
Toshihiro Shimizu |
890ddd |
//! a proportional number of threads simultaneously running to dispatch it; if this is
|
|
Toshihiro Shimizu |
890ddd |
//! a problem, consider the use of \b Executor::setMaxActiveTasks()
|
|
Toshihiro Shimizu |
890ddd |
//! to make only a certain number of tasks being executed at the same time.
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
int Runnable::taskLoad()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
return 0;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Returns the priority value used to schedule a task for execution. Tasks
|
|
Toshihiro Shimizu |
890ddd |
//! with higher priority start before tasks with lower priority. The default
|
|
Toshihiro Shimizu |
890ddd |
//! value returned is 5 (halfway from 0 to 10) - but any value other than
|
|
Toshihiro Shimizu |
890ddd |
//! (std::numeric_limits<int>::max)() is acceptable.</int>
|
|
Toshihiro Shimizu |
890ddd |
int Runnable::schedulingPriority()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
return 5;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Returns the QThread::Priority used by worker threads when they adopt
|
|
Toshihiro Shimizu |
890ddd |
//! the task. The default value returned is QThread::Normal.
|
|
Toshihiro Shimizu |
890ddd |
QThread::Priority Runnable::runningPriority()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
return QThread::NormalPriority;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline bool Runnable::customConditions()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
return (m_id->m_activeTasks < m_id->m_maxActiveTasks) &&
|
|
Toshihiro Shimizu |
890ddd |
(m_id->m_activeLoad + m_load <= m_id->m_maxActiveLoad);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::started(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
This signal is emitted from working threads just before the run() code
|
|
Toshihiro Shimizu |
890ddd |
is executed. Observe that the passed smart pointer ensures the survival of
|
|
Toshihiro Shimizu |
890ddd |
the emitting task for the time required by connected slots execution.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\warning The started(), finished() and exception() signals are emitted in
|
|
Toshihiro Shimizu |
890ddd |
a mutex-protected environment in order to provide the correct sequence of
|
|
Toshihiro Shimizu |
890ddd |
emissions (i.e. so that canceled() and terminated() controller-emitted signals
|
|
Toshihiro Shimizu |
890ddd |
are either delivered after started() and before finished() or exception(),
|
|
Toshihiro Shimizu |
890ddd |
or \a instead of them).
|
|
Toshihiro Shimizu |
890ddd |
\warning Thus, setting up blocking connections or \a direct slots that contain a
|
|
Toshihiro Shimizu |
890ddd |
blocking instruction or even calls to the Executor API (which would definitely
|
|
Toshihiro Shimizu |
890ddd |
try to relock the aforementioned mutex) is dangerous and could result in an
|
|
Toshihiro Shimizu |
890ddd |
application freeze.
|
|
Toshihiro Shimizu |
890ddd |
\warning In case it's necessary to use blocking features, they should be enforced
|
|
Toshihiro Shimizu |
890ddd |
through custom signals to be invoked manually in the run() method, outside the
|
|
Toshihiro Shimizu |
890ddd |
mutex.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b finished and \b exception signals.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::finished(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
The \b finished signal is emitted from working threads once the run()
|
|
Toshihiro Shimizu |
890ddd |
code is returned without unmanaged exceptions.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b started and \b exception signals.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::exception(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
The \b exception signal is emitted from working threads whenever an
|
|
Toshihiro Shimizu |
890ddd |
untrapped exception is found within the run() method.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b started and \b finished signals.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::canceled(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
The \b canceled signal is emitted from the controller thread whenever
|
|
Toshihiro Shimizu |
890ddd |
a task which is currently under the task manager's control is canceled
|
|
Toshihiro Shimizu |
890ddd |
by the user (the signal is emitted from the thread invoking the cancel).
|
|
Toshihiro Shimizu |
890ddd |
Observe that tasks under execution are not stopped by the task manager
|
|
Toshihiro Shimizu |
890ddd |
when they are canceled, but the signal is emitted anyway - helping the
|
|
Toshihiro Shimizu |
890ddd |
user to stop the actual execution of the run() code in advance.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b Executor::removeTask and \b Executor::cancelAll methods.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
/*!
|
|
Toshihiro Shimizu |
890ddd |
\fn void Runnable::terminated(RunnableP sender)
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
The \b terminated signal is emitted from the controller thread when
|
|
Toshihiro Shimizu |
890ddd |
the Executor components are shutting down inside a call to
|
|
Toshihiro Shimizu |
890ddd |
Executor::shutdown(). Implementing a slot connected to this signal
|
|
Toshihiro Shimizu |
890ddd |
helps the user in controlling the flow of an Executor-multithreaded
|
|
Toshihiro Shimizu |
890ddd |
application when it is shutting down - for example, it can be imposed
|
|
Toshihiro Shimizu |
890ddd |
that the application must wait for the task to be finished, print logs
|
|
Toshihiro Shimizu |
890ddd |
or similar.
|
|
Toshihiro Shimizu |
890ddd |
This signal is always preceded by a canceled() signal, informing all
|
|
Toshihiro Shimizu |
890ddd |
active tasks that thay should begin quitting on their own in a 'soft'
|
|
Toshihiro Shimizu |
890ddd |
way before brute termination may occur.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
\sa \b Executor::shutdown static method and \b Runnable::canceled signal.
|
|
Toshihiro Shimizu |
890ddd |
*/
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Convenience slot for the started() signal - so it's not necessary to declare
|
|
Toshihiro Shimizu |
890ddd |
//! the task in a header file for moc'ing each time. You must both reimplement
|
|
Toshihiro Shimizu |
890ddd |
//! \b and connect it to the started() signal to make it work.
|
|
Toshihiro Shimizu |
890ddd |
void Runnable::onStarted(RunnableP)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The analogous of onStarted() for the finished() signal.
|
|
Toshihiro Shimizu |
890ddd |
void Runnable::onFinished(RunnableP)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The analogous of onStarted() for the exception() signal.
|
|
Toshihiro Shimizu |
890ddd |
void Runnable::onException(RunnableP)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The analogous of onStarted() for the canceled() signal.
|
|
Toshihiro Shimizu |
890ddd |
void Runnable::onCanceled(RunnableP)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! The analogous of onStarted() for the terminated() signal.
|
|
Toshihiro Shimizu |
890ddd |
void Runnable::onTerminated(RunnableP)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==========================
|
|
Toshihiro Shimizu |
890ddd |
// ExecutorId methods
|
|
Toshihiro Shimizu |
890ddd |
//--------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorId::ExecutorId()
|
|
Toshihiro Shimizu |
890ddd |
: m_activeTasks(0), m_maxActiveTasks(1), m_activeLoad(0), m_maxActiveLoad((std::numeric_limits<int>::max)()), m_dedicatedThreads(false), m_persistentThreads(false)</int>
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
m_id = globalImp->m_executorIdPool.acquire();
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_waitingFlagsPool.resize(globalImp->m_executorIdPool.size());
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorId::~ExecutorId()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (m_dedicatedThreads) {
|
|
Toshihiro Shimizu |
890ddd |
m_persistentThreads = 0;
|
|
Toshihiro Shimizu |
890ddd |
refreshDedicatedList();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_executorIdPool.release(m_id);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Make sure that sleeping workers are eliminated properly if the permanent
|
|
Toshihiro Shimizu |
890ddd |
//workers count decreases.
|
|
Toshihiro Shimizu |
890ddd |
void ExecutorId::refreshDedicatedList()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
//QMutexLocker transitionLocker(&globalImp->m_transitionMutex); //Already covered
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (!m_dedicatedThreads || !m_persistentThreads) {
|
|
Toshihiro Shimizu |
890ddd |
//Release all sleeping workers
|
|
Toshihiro Shimizu |
890ddd |
//Wake them - they will exit on their own
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
unsigned int i, size = m_sleepings.size();
|
|
Toshihiro Shimizu |
890ddd |
for (i = 0; i < size; ++i) {
|
|
Toshihiro Shimizu |
890ddd |
m_sleepings[i]->m_exit = true;
|
|
Toshihiro Shimizu |
890ddd |
m_sleepings[i]->m_waitCondition.wakeOne();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
m_sleepings.clear();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Worker methods
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
Worker::Worker()
|
|
Toshihiro Shimizu |
890ddd |
: QThread(), m_task(0), m_master(0), m_exit(true)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
Worker::~Worker()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void Worker::run()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
//Ensure atomicity of worker's state transitions
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker sl(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (shutdownVar)
|
|
Toshihiro Shimizu |
890ddd |
return;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
for (;;) {
|
|
Toshihiro Shimizu |
890ddd |
//Run the taken task
|
|
Toshihiro Shimizu |
890ddd |
setPriority(m_task->runningPriority());
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
try {
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT m_task->started(m_task);
|
|
Toshihiro Shimizu |
890ddd |
sl.unlock();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
m_task->run();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
sl.relock();
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT m_task->finished(m_task);
|
|
Toshihiro Shimizu |
890ddd |
} catch (...) {
|
|
Toshihiro Shimizu |
890ddd |
sl.relock(); //throw must be in the run() block
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT m_task->exception(m_task);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
updateCountsOnRelease();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (shutdownVar)
|
|
Toshihiro Shimizu |
890ddd |
return;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Get the next task
|
|
Toshihiro Shimizu |
890ddd |
takeTask();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (!m_task) {
|
|
Toshihiro Shimizu |
890ddd |
onFinish();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (!m_exit && !shutdownVar) {
|
|
Toshihiro Shimizu |
890ddd |
//Put the worker to sleep
|
|
Toshihiro Shimizu |
890ddd |
m_waitCondition.wait(sl.mutex());
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Upon thread destruction the wait condition is implicitly woken up.
|
|
Toshihiro Shimizu |
890ddd |
//If this is the case, m_task == 0 and we return.
|
|
Toshihiro Shimizu |
890ddd |
if (!m_task || shutdownVar)
|
|
Toshihiro Shimizu |
890ddd |
return;
|
|
Toshihiro Shimizu |
890ddd |
} else
|
|
Toshihiro Shimizu |
890ddd |
return;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void Worker::updateCountsOnTake()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_activeLoad += m_task->m_load;
|
|
Toshihiro Shimizu |
890ddd |
m_task->m_id->m_activeLoad += m_task->m_load;
|
|
Toshihiro Shimizu |
890ddd |
++m_task->m_id->m_activeTasks;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void Worker::updateCountsOnRelease()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_activeLoad -= m_task->m_load;
|
|
Toshihiro Shimizu |
890ddd |
m_task->m_id->m_activeLoad -= m_task->m_load;
|
|
Toshihiro Shimizu |
890ddd |
--m_task->m_id->m_activeTasks;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void Worker::onFinish()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
if (m_master && m_master->m_dedicatedThreads && m_master->m_persistentThreads) {
|
|
Toshihiro Shimizu |
890ddd |
m_exit = false;
|
|
Toshihiro Shimizu |
890ddd |
m_master->m_sleepings.push_back(this);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// Unlock the mutex - since eventually invoked ~ExecutorId will relock it...
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_transitionMutex.unlock();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
m_master = 0; //Master may be destroyed here - and m_exit= true for all sleepings
|
|
Toshihiro Shimizu |
890ddd |
//in that case
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_transitionMutex.lock();
|
|
Toshihiro Shimizu |
890ddd |
} else {
|
|
Toshihiro Shimizu |
890ddd |
m_exit = true;
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_workers.erase(this);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//===========================
|
|
Toshihiro Shimizu |
890ddd |
// Executor methods
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
Executor::Executor()
|
|
Toshihiro Shimizu |
890ddd |
: m_id(new ExecutorId)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
m_id->addRef();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
Executor::~Executor()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
m_id->release();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! This static method declares the use of the Executor's task manager into
|
|
Toshihiro Shimizu |
890ddd |
//! the application code. Be sure to use it according to the following rules:
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
//! Only QCoreApplications or QApplications may use Executors.
|
|
Toshihiro Shimizu |
890ddd |
//! This method must be invoked in a thread which performs constant Qt event
|
|
Toshihiro Shimizu |
890ddd |
//! processing - like the main loop of interactive GUI applications.
|
|
Toshihiro Shimizu |
890ddd |
//! No task processing is allowed after event processing stops.
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
void Executor::init()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
//If no global ExecutorImp exists, allocate it now. You may not move this
|
|
Toshihiro Shimizu |
890ddd |
//to a static declaration, since ExecutorImpSlots's connections must be
|
|
Toshihiro Shimizu |
890ddd |
//made once the QCoreApplication has been constructed.
|
|
Toshihiro Shimizu |
890ddd |
if (!globalImp) {
|
|
Toshihiro Shimizu |
890ddd |
globalImp = new ExecutorImp;
|
|
Toshihiro Shimizu |
890ddd |
globalImpSlots = new ExecutorImpSlots;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
qRegisterMetaType<tthread::runnablep>("TThread::RunnableP");</tthread::runnablep>
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! This static method, which \b must be invoked in the controller thread, declares
|
|
Toshihiro Shimizu |
890ddd |
//! termination of all Executor-based components, forcing the execution of tasks submitted
|
|
Toshihiro Shimizu |
890ddd |
//! by any Executor to quit as soon as possible in a safe way.
|
|
Toshihiro Shimizu |
890ddd |
//! When the shutdown method is invoked, the task manager first emits a canceled()
|
|
Toshihiro Shimizu |
890ddd |
//! signal for all the tasks that were submitted to it, independently from the Executor that
|
|
Toshihiro Shimizu |
890ddd |
//! performed the submission; then, tasks that are still active once all the cancellation signals
|
|
Toshihiro Shimizu |
890ddd |
//! were delivered further receive a terminated() signal informing them that they must provide
|
|
Toshihiro Shimizu |
890ddd |
//! code termination (or at least remain silent in a safe state until the application quits).
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! \b NOTE: Observe that this method does not explicitly wait for all the tasks to terminate - this depends
|
|
Toshihiro Shimizu |
890ddd |
//! on the code connected to the terminated() signal and is under the user's responsibility (see the
|
|
Toshihiro Shimizu |
890ddd |
//! remarks specified in started() signal descritpion); if this is the intent and the terminated slot
|
|
Toshihiro Shimizu |
890ddd |
//! is invoked in the controller thread, you should remember to implement a local event loop in it (so that
|
|
Toshihiro Shimizu |
890ddd |
//! event processing is still performed) and wait there until the first finished() or catched()
|
|
Toshihiro Shimizu |
890ddd |
//! slot make it quit.
|
|
Toshihiro Shimizu |
890ddd |
void Executor::shutdown()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
//Updating tasks list - lock against state transitions
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
shutdownVar = true;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Cancel all tasks - first the active ones
|
|
Toshihiro Shimizu |
890ddd |
std::set<worker *="">::iterator it;</worker>
|
|
Toshihiro Shimizu |
890ddd |
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) {
|
|
Toshihiro Shimizu |
890ddd |
RunnableP task = (*it)->m_task;
|
|
Toshihiro Shimizu |
890ddd |
if (task)
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT task->canceled(task);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Finally, deal with the global queue tasks
|
|
Toshihiro Shimizu |
890ddd |
QMutableMapIterator<int, runnablep=""> jt(globalImp->m_tasks);</int,>
|
|
Toshihiro Shimizu |
890ddd |
while (jt.hasNext()) {
|
|
Toshihiro Shimizu |
890ddd |
jt.next();
|
|
Toshihiro Shimizu |
890ddd |
RunnableP task = jt.value();
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT task->canceled(task);
|
|
Toshihiro Shimizu |
890ddd |
jt.remove();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Now, send the terminate() signal to all active tasks
|
|
Toshihiro Shimizu |
890ddd |
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) {
|
|
Toshihiro Shimizu |
890ddd |
RunnableP task = (*it)->m_task;
|
|
Toshihiro Shimizu |
890ddd |
if (task)
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT task->terminated(task);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Just placing a convenience processEvents() to make sure that queued slots invoked by the
|
|
Toshihiro Shimizu |
890ddd |
//signals above are effectively invoked in this method - without having to return to an event loop.
|
|
Toshihiro Shimizu |
890ddd |
QCoreApplication::processEvents();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Specifies the use of dedicated threads for the Executor's task group.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! By default a worker thread attempts adoption of Runnable tasks
|
|
Toshihiro Shimizu |
890ddd |
//! without regard to the Executor that performed the submission. This helps
|
|
Toshihiro Shimizu |
890ddd |
//! in stabilizing the number of threads that are created and destroyed
|
|
Toshihiro Shimizu |
890ddd |
//! by the task manager - but may be a problem in some cases.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Using this method the user can explicitly tell the Executor to seize the
|
|
Toshihiro Shimizu |
890ddd |
//! ownership of worker threads assigned to its tasks, so that they will not
|
|
Toshihiro Shimizu |
890ddd |
//! try adoption of external tasks but instead remain focused on Executor's
|
|
Toshihiro Shimizu |
890ddd |
//! tasks only.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! An optional \b persistent parameter may be passed, which specifies if
|
|
Toshihiro Shimizu |
890ddd |
//! dedicated threads should remain sleeping or should rather die when no
|
|
Toshihiro Shimizu |
890ddd |
//! processable tasks from the Executor are found.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! This method is especially helpful in two occasions:
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
//! The Executor's tasks use thread-specific data such as QThreadStorages,
|
|
Toshihiro Shimizu |
890ddd |
//! which may be recycled among different tasks.
|
|
Toshihiro Shimizu |
890ddd |
//! The Executor receives tasks at a frequent rate, but mostly ends each one before
|
|
Toshihiro Shimizu |
890ddd |
//! another one is submitted - resulting in a continuous thread turnover.
|
|
Toshihiro Shimizu |
890ddd |
//!
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void Executor::setDedicatedThreads(bool dedicated, bool persistent)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
m_id->m_dedicatedThreads = dedicated;
|
|
Toshihiro Shimizu |
890ddd |
m_id->m_persistentThreads = persistent;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
m_id->refreshDedicatedList();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Submits a task for execution. The task is executed according to
|
|
Toshihiro Shimizu |
890ddd |
//! its task load, insertion time and scheduling priority.
|
|
Toshihiro Shimizu |
890ddd |
void Executor::addTask(RunnableP task)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
if (task->m_id) // Must be done outside transition lock, since eventually
|
|
Toshihiro Shimizu |
890ddd |
task->m_id->release(); // invoked ~ExecutorId will lock it
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Updating tasks and workers list - lock against state transitions
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
task->m_id = m_id;
|
|
Toshihiro Shimizu |
890ddd |
m_id->addRef();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
globalImp->insertTask(task->schedulingPriority(), task);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//If addTask is called in the main thread, the emit works directly -
|
|
Toshihiro Shimizu |
890ddd |
//so it is necessary to unlock the mutex *before* emitting the refresh.
|
|
Toshihiro Shimizu |
890ddd |
globalImpSlots->emitRefreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Removes the given task from scheduled execution and emits its
|
|
Toshihiro Shimizu |
890ddd |
//! Runnable::canceled signal. Tasks already under execution are not
|
|
Toshihiro Shimizu |
890ddd |
//! stopped by this method - although the canceled signal is still emitted.
|
|
Toshihiro Shimizu |
890ddd |
//! It has no effect if the task is not currently under the task manager's control.
|
|
Toshihiro Shimizu |
890ddd |
//! \sa \b Runnable::canceled signal and the \b cancelAll method.
|
|
Toshihiro Shimizu |
890ddd |
void Executor::removeTask(RunnableP task)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
//If the task does not belong to this Executor, quit.
|
|
Toshihiro Shimizu |
890ddd |
if (task->m_id != m_id)
|
|
Toshihiro Shimizu |
890ddd |
return;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Updating tasks list - lock against state transitions
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Then, look in the global queue - if it is found, emiminate the task and
|
|
Toshihiro Shimizu |
890ddd |
//send the canceled signal.
|
|
Toshihiro Shimizu |
890ddd |
if (globalImp->m_tasks.remove(task->m_schedulingPriority, task)) {
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT task->canceled(task);
|
|
Toshihiro Shimizu |
890ddd |
return;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Finally, the task may be running - look in workers.
|
|
Toshihiro Shimizu |
890ddd |
std::set<worker *=""> &workers = globalImp->m_workers;</worker>
|
|
Toshihiro Shimizu |
890ddd |
std::set<worker *="">::iterator it;</worker>
|
|
Toshihiro Shimizu |
890ddd |
for (it = workers.begin(); it != workers.end(); ++it)
|
|
Toshihiro Shimizu |
890ddd |
if (task && (*it)->m_task == task)
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT task->canceled(task);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//No need to refresh - tasks were eventually decremented...
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Clears the task manager of all tasks added by this Executor and emits
|
|
Toshihiro Shimizu |
890ddd |
//! the Runnable::canceled signal for each of them. The same specifications
|
|
Toshihiro Shimizu |
890ddd |
//! described in the \b removeTask method apply here.
|
|
Toshihiro Shimizu |
890ddd |
//! \sa \b Runnable::canceled signal and the \b removeTask method.
|
|
Toshihiro Shimizu |
890ddd |
void Executor::cancelAll()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
//Updating tasks list - lock against state transitions
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Clear the tasks chronologically. So, first check currently working
|
|
Toshihiro Shimizu |
890ddd |
//tasks.
|
|
Toshihiro Shimizu |
890ddd |
std::set<worker *="">::iterator it;</worker>
|
|
Toshihiro Shimizu |
890ddd |
for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) {
|
|
Toshihiro Shimizu |
890ddd |
RunnableP task = (*it)->m_task;
|
|
Toshihiro Shimizu |
890ddd |
if (task && task->m_id == m_id)
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT task->canceled(task);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Finally, clear the global tasks list from all tasks inserted by this executor
|
|
Toshihiro Shimizu |
890ddd |
//NOTE: An easier way here?
|
|
Toshihiro Shimizu |
890ddd |
QMutableMapIterator<int, runnablep=""> jt(globalImp->m_tasks);</int,>
|
|
Toshihiro Shimizu |
890ddd |
while (jt.hasNext()) {
|
|
Toshihiro Shimizu |
890ddd |
jt.next();
|
|
Toshihiro Shimizu |
890ddd |
if (jt.value()->m_id == m_id) {
|
|
Toshihiro Shimizu |
890ddd |
RunnableP task = jt.value();
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT task->canceled(task);
|
|
Toshihiro Shimizu |
890ddd |
jt.remove();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Declares that only a certain number of tasks added by this Executor
|
|
Toshihiro Shimizu |
890ddd |
//! may be processed simultaneously. The default is 1 - meaning that tasks
|
|
Toshihiro Shimizu |
890ddd |
//! added to the executor are completely serialized. A negative task number
|
|
Toshihiro Shimizu |
890ddd |
//! disables any form of task serialization.
|
|
Toshihiro Shimizu |
890ddd |
//! \b NOTE: Currently, tasks that do not
|
|
Toshihiro Shimizu |
890ddd |
//! satisfy this condition avoid blocking execution of tasks not
|
|
Toshihiro Shimizu |
890ddd |
//! added by the same Executor - even if they were scheduled for later
|
|
Toshihiro Shimizu |
890ddd |
//! execution.
|
|
Toshihiro Shimizu |
890ddd |
void Executor::setMaxActiveTasks(int maxActiveTasks)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (maxActiveTasks <= 0)
|
|
Toshihiro Shimizu |
890ddd |
m_id->m_maxActiveTasks = (std::numeric_limits<int>::max)();</int>
|
|
Toshihiro Shimizu |
890ddd |
else
|
|
Toshihiro Shimizu |
890ddd |
m_id->m_maxActiveTasks = maxActiveTasks;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
int Executor::maxActiveTasks() const
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
return m_id->m_maxActiveTasks;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//! Declares a maximum overall task load for the tasks added by this Executor.
|
|
Toshihiro Shimizu |
890ddd |
//! \b NOTE: The same remark for setMaxActiveTasks() holds here.
|
|
Toshihiro Shimizu |
890ddd |
void Executor::setMaxActiveLoad(int maxActiveLoad)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
m_id->m_maxActiveLoad = maxActiveLoad;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
int Executor::maxActiveLoad() const
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
return m_id->m_maxActiveLoad;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//==================================
|
|
Toshihiro Shimizu |
890ddd |
// ExecutorImpSlots methods
|
|
Toshihiro Shimizu |
890ddd |
//----------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImpSlots::ExecutorImpSlots()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
connect(this, SIGNAL(refreshAssignments()), this, SLOT(onRefreshAssignments()));
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
ExecutorImpSlots::~ExecutorImpSlots()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void ExecutorImpSlots::emitRefreshAssignments()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
Q_EMIT refreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void ExecutorImpSlots::onRefreshAssignments()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
QMutexLocker transitionLocker(&globalImp->m_transitionMutex);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
globalImp->refreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void ExecutorImpSlots::onTerminated()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
delete QObject::sender();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//=====================================================================
|
|
Toshihiro Shimizu |
890ddd |
// Task adoption methods
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void ExecutorId::newWorker(RunnableP &task)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
Worker *worker;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (m_sleepings.size()) {
|
|
Toshihiro Shimizu |
890ddd |
worker = m_sleepings.front();
|
|
Toshihiro Shimizu |
890ddd |
m_sleepings.pop_front();
|
|
Toshihiro Shimizu |
890ddd |
worker->m_task = task;
|
|
Toshihiro Shimizu |
890ddd |
worker->updateCountsOnTake();
|
|
Toshihiro Shimizu |
890ddd |
worker->m_waitCondition.wakeOne();
|
|
Toshihiro Shimizu |
890ddd |
} else {
|
|
Toshihiro Shimizu |
890ddd |
worker = new Worker;
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_workers.insert(worker);
|
|
Toshihiro Shimizu |
890ddd |
QObject::connect(worker, SIGNAL(finished()), globalImpSlots, SLOT(onTerminated()));
|
|
Toshihiro Shimizu |
890ddd |
worker->m_task = task;
|
|
Toshihiro Shimizu |
890ddd |
worker->updateCountsOnTake();
|
|
Toshihiro Shimizu |
890ddd |
worker->start();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline void Worker::adoptTask(RunnableP &task)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
m_task = task;
|
|
Toshihiro Shimizu |
890ddd |
updateCountsOnTake();
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Le task timedOut sono ex-accumulate che non soddisfano le condizioni
|
|
Toshihiro Shimizu |
890ddd |
// standard. Quindi sono bloccanti *ovunque*.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Il refresh dei worker sulle task accumulate *deve* avvenire:
|
|
Toshihiro Shimizu |
890ddd |
// -Se e solo se una task dello stesso executor finisce,
|
|
Toshihiro Shimizu |
890ddd |
// perche' e' l'unico caso in cui le custom conditions
|
|
Toshihiro Shimizu |
890ddd |
// vengono aggiornate
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// * Se un thread dedicato non puo' prendere una task estranea, non e' detto
|
|
Toshihiro Shimizu |
890ddd |
// che altri non possano!
|
|
Toshihiro Shimizu |
890ddd |
// * Le task che richiedono dedizione dovrebbero essere adottate da thread
|
|
Toshihiro Shimizu |
890ddd |
// gia' dedicati, se esistono! => Gli executor che richiedono thread dedicati
|
|
Toshihiro Shimizu |
890ddd |
// li creano a parte e non li condividono con nessuno: cioe', thread creati
|
|
Toshihiro Shimizu |
890ddd |
// senza dedizione non devono poter adottare task richiedenti dedizione!
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Assigns tasks polled from the id's accumulation queue (if id is given) and
|
|
Toshihiro Shimizu |
890ddd |
//the global tasks queue.
|
|
Toshihiro Shimizu |
890ddd |
//It works like:
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// a) First look if there exist tasks with timedOut priority and if so
|
|
Toshihiro Shimizu |
890ddd |
// try to take them out
|
|
Toshihiro Shimizu |
890ddd |
// b) Then look for tasks in the id's accumulation queue
|
|
Toshihiro Shimizu |
890ddd |
// c) Finally search in the remaining global tasks queue
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
void ExecutorImp::refreshAssignments()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
//QMutexLocker transitionLocker(&globalImp->m_transitionMutex); //Already covered
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (m_tasks.isEmpty())
|
|
Toshihiro Shimizu |
890ddd |
return;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// Erase the id vector data
|
|
Toshihiro Shimizu |
890ddd |
assert(m_executorIdPool.size() == m_waitingFlagsPool.size());
|
|
Toshihiro Shimizu |
890ddd |
memset(&m_waitingFlagsPool.front(), 0, m_waitingFlagsPool.size());
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//c) Try with the global queue
|
|
Toshihiro Shimizu |
890ddd |
int e, executorsCount = m_executorIdPool.acquiredSize();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
int i, tasksCount = m_tasks.size();
|
|
Toshihiro Shimizu |
890ddd |
QMultiMap<int, runnablep="">::iterator it;</int,>
|
|
Toshihiro Shimizu |
890ddd |
for (i = 0, e = 0, it = m_tasks.end() - 1; i < tasksCount && e < executorsCount; ++i, --it) {
|
|
Toshihiro Shimizu |
890ddd |
//std::cout<< "global tasks-refreshAss" << std::endl;
|
|
Toshihiro Shimizu |
890ddd |
//Take the task
|
|
Toshihiro Shimizu |
890ddd |
RunnableP task = it.value();
|
|
Toshihiro Shimizu |
890ddd |
task->m_load = task->taskLoad();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
UCHAR &idWaitingForAnotherTask = m_waitingFlagsPool[task->m_id->m_id];
|
|
Toshihiro Shimizu |
890ddd |
if (idWaitingForAnotherTask)
|
|
Toshihiro Shimizu |
890ddd |
continue;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (!isExecutable(task))
|
|
Toshihiro Shimizu |
890ddd |
break;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (!task->customConditions()) {
|
|
Toshihiro Shimizu |
890ddd |
++e;
|
|
Toshihiro Shimizu |
890ddd |
idWaitingForAnotherTask = 1;
|
|
Toshihiro Shimizu |
890ddd |
} else {
|
|
Toshihiro Shimizu |
890ddd |
task->m_id->newWorker(task);
|
|
Toshihiro Shimizu |
890ddd |
it = m_tasks.erase(it);
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
inline bool Worker::canAdopt(const RunnableP &task)
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
return task->m_id->m_sleepings.size() == 0 && //Always prefer sleeping dedicateds if present
|
|
Toshihiro Shimizu |
890ddd |
(!m_master || (m_master.getPointer() == task->m_id)); //If was seized by an Executor, ensure task compatibility
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//---------------------------------------------------------------------
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Takes a task and assigns it to the worker in a way similar to the one above.
|
|
Toshihiro Shimizu |
890ddd |
inline void Worker::takeTask()
|
|
Toshihiro Shimizu |
890ddd |
{
|
|
Toshihiro Shimizu |
890ddd |
TSmartPointerT<executorid> oldId = m_task->m_id;</executorid>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//When a new task is taken, the old one's Executor may seize the worker
|
|
Toshihiro Shimizu |
890ddd |
m_master = oldId->m_dedicatedThreads ? oldId : (TSmartPointerT<executorid>)0;</executorid>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//c) No accumulated task can be taken - look for a task in the global tasks queue.
|
|
Toshihiro Shimizu |
890ddd |
// If the active load admits it, take the earliest task.
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Free the old task. NOTE: This instruction MUST be performed OUTSIDE the mutex-protected environment -
|
|
Toshihiro Shimizu |
890ddd |
//since user code may be executed upon task destruction - including the mutex relock!!
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_transitionMutex.unlock();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
m_task = 0;
|
|
Toshihiro Shimizu |
890ddd |
oldId = TSmartPointerT<executorid>();</executorid>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
globalImp->m_transitionMutex.lock();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
// Erase the executor id status pool
|
|
Toshihiro Shimizu |
890ddd |
tcg::indices_pool<> &executorIdPool = globalImp->m_executorIdPool;
|
|
Toshihiro Shimizu |
890ddd |
std::vector<uchar> &waitingFlagsPool = globalImp->m_waitingFlagsPool;</uchar>
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
assert(waitingFlagsPool.size() == globalImp->m_executorIdPool.size());
|
|
Toshihiro Shimizu |
890ddd |
memset(&waitingFlagsPool.front(), 0, waitingFlagsPool.size());
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
int e, executorsCount = executorIdPool.acquiredSize();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
int i, tasksCount = globalImp->m_tasks.size();
|
|
Toshihiro Shimizu |
890ddd |
QMultiMap<int, runnablep="">::iterator it;</int,>
|
|
Toshihiro Shimizu |
890ddd |
for (i = 0, e = 0, it = globalImp->m_tasks.end() - 1; i < tasksCount && e < executorsCount; ++i, --it) {
|
|
Toshihiro Shimizu |
890ddd |
//std::cout<< "global tasks-takeTask" << std::endl;
|
|
Toshihiro Shimizu |
890ddd |
//Take the first task
|
|
Toshihiro Shimizu |
890ddd |
RunnableP task = it.value();
|
|
Toshihiro Shimizu |
890ddd |
task->m_load = task->taskLoad();
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
UCHAR &idWaitingForAnotherTask = waitingFlagsPool[task->m_id->m_id];
|
|
Toshihiro Shimizu |
890ddd |
if (idWaitingForAnotherTask)
|
|
Toshihiro Shimizu |
890ddd |
continue;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
if (!globalImp->isExecutable(task))
|
|
Toshihiro Shimizu |
890ddd |
break;
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//In case the worker was captured for dedication, check the task compatibility.
|
|
Toshihiro Shimizu |
890ddd |
if (!canAdopt(task)) {
|
|
Toshihiro Shimizu |
890ddd |
//some other worker may still take the task...
|
|
Toshihiro Shimizu |
890ddd |
globalImpSlots->emitRefreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
break;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
//Test its custom conditions
|
|
Toshihiro Shimizu |
890ddd |
if (!task->customConditions()) {
|
|
Toshihiro Shimizu |
890ddd |
++e;
|
|
Toshihiro Shimizu |
890ddd |
idWaitingForAnotherTask = 1;
|
|
Toshihiro Shimizu |
890ddd |
} else {
|
|
Toshihiro Shimizu |
890ddd |
adoptTask(task);
|
|
Toshihiro Shimizu |
890ddd |
it = globalImp->m_tasks.erase(it);
|
|
Toshihiro Shimizu |
890ddd |
|
|
Toshihiro Shimizu |
890ddd |
globalImpSlots->emitRefreshAssignments();
|
|
Toshihiro Shimizu |
890ddd |
break;
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|
|
Toshihiro Shimizu |
890ddd |
}
|