| |
| |
| #ifdef _DEBUG |
| #define _STLP_DEBUG 1 |
| #endif |
| |
| |
| #include "tthreadmessage.h" |
| #include "tsystem.h" |
| #include "tatomicvar.h" |
| |
| #include "tthread.h" |
| #include "tthreadp.h" |
| |
| |
| #include <set> |
| #include <deque> |
| |
| |
| #include "tcg/tcg_pool.h" |
| |
| |
| #include <QMultiMap> |
| #include <QMutableMapIterator> |
| #include <QMutex> |
| #include <QWaitCondition> |
| #include <QMetaType> |
| #include <QCoreApplication> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| using namespace TThread; |
| |
| DEFINE_CLASS_CODE(TThread::Runnable, 21) |
| |
| |
| |
| |
| |
| |
| |
| void TThread::init() |
| { |
| Executor::init(); |
| TThreadMessageDispatcher::init(); |
| } |
| |
| |
| |
| void TThread::shutdown() |
| { |
| Executor::shutdown(); |
| } |
| |
| |
| |
| namespace TThread |
| { |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| class Worker : public QThread |
| { |
| public: |
| RunnableP m_task; |
| |
| TSmartPointerT<ExecutorId> m_master; |
| |
| bool m_exit; |
| QWaitCondition m_waitCondition; |
| |
| Worker(); |
| ~Worker(); |
| |
| void run(); |
| |
| inline void takeTask(); |
| inline bool canAdopt(const RunnableP &task); |
| inline void adoptTask(RunnableP &task); |
| |
| inline void rest(); |
| |
| inline void updateCountsOnTake(); |
| inline void updateCountsOnRelease(); |
| |
| inline void onFinish(); |
| }; |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| class ExecutorId : public TSmartObject |
| { |
| public: |
| size_t m_id; |
| |
| int m_activeTasks; |
| int m_maxActiveTasks; |
| |
| int m_activeLoad; |
| int m_maxActiveLoad; |
| |
| bool m_dedicatedThreads; |
| bool m_persistentThreads; |
| std::deque<Worker *> m_sleepings; |
| |
| ExecutorId(); |
| ~ExecutorId(); |
| |
| inline void accumulate(const RunnableP &task); |
| |
| void newWorker(RunnableP &task); |
| void refreshDedicatedList(); |
| }; |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| class ExecutorImp |
| { |
| public: |
| QMultiMap<int, RunnableP> m_tasks; |
| std::set<Worker *> m_workers; |
| |
| tcg::indices_pool<> m_executorIdPool; |
| std::vector<UCHAR> m_waitingFlagsPool; |
| |
| int m_activeLoad; |
| int m_maxLoad; |
| |
| QMutex m_transitionMutex; |
| |
| ExecutorImp(); |
| ~ExecutorImp(); |
| |
| inline void insertTask(int schedulingPriority, RunnableP &task); |
| |
| void refreshAssignments(); |
| |
| inline bool isExecutable(RunnableP &task); |
| }; |
| |
| |
| |
| } |
| |
| |
| |
| |
| |
| |
| |
| namespace |
| { |
| ExecutorImp *globalImp = 0; |
| ExecutorImpSlots *globalImpSlots = 0; |
| bool shutdownVar = false; |
| } |
| |
| |
| |
| |
| |
| |
| |
| ExecutorImp::ExecutorImp() |
| : m_activeLoad(0), m_maxLoad(TSystem::getProcessorCount() * 100), m_transitionMutex() |
| { |
| } |
| |
| |
| |
| ExecutorImp::~ExecutorImp() |
| { |
| } |
| |
| |
| |
| |
| |
| |
| inline bool ExecutorImp::isExecutable(RunnableP &task) |
| { |
| return m_activeLoad + task->m_load <= m_maxLoad; |
| } |
| |
| |
| |
| inline void ExecutorImp::insertTask(int schedulingPriority, RunnableP &task) |
| { |
| task->m_schedulingPriority = schedulingPriority; |
| m_tasks.insert(schedulingPriority, task); |
| } |
| |
| |
| |
| |
| |
| |
| |
| Runnable::Runnable() |
| : TSmartObject(m_classCode), m_id(0) |
| { |
| } |
| |
| |
| |
| Runnable::~Runnable() |
| { |
| if (m_id) |
| m_id->release(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| int Runnable::taskLoad() |
| { |
| return 0; |
| } |
| |
| |
| |
| |
| |
| |
| |
| int Runnable::schedulingPriority() |
| { |
| return 5; |
| } |
| |
| |
| |
| |
| |
| QThread::Priority Runnable::runningPriority() |
| { |
| return QThread::NormalPriority; |
| } |
| |
| |
| |
| inline bool Runnable::customConditions() |
| { |
| return (m_id->m_activeTasks < m_id->m_maxActiveTasks) && |
| (m_id->m_activeLoad + m_load <= m_id->m_maxActiveLoad); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| void Runnable::onStarted(RunnableP) |
| { |
| } |
| |
| |
| |
| |
| void Runnable::onFinished(RunnableP) |
| { |
| } |
| |
| |
| |
| |
| void Runnable::onException(RunnableP) |
| { |
| } |
| |
| |
| |
| |
| void Runnable::onCanceled(RunnableP) |
| { |
| } |
| |
| |
| |
| |
| void Runnable::onTerminated(RunnableP) |
| { |
| } |
| |
| |
| |
| |
| |
| |
| |
| ExecutorId::ExecutorId() |
| : m_activeTasks(0), m_maxActiveTasks(1), m_activeLoad(0), m_maxActiveLoad((std::numeric_limits<int>::max)()), m_dedicatedThreads(false), m_persistentThreads(false) |
| { |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| m_id = globalImp->m_executorIdPool.acquire(); |
| globalImp->m_waitingFlagsPool.resize(globalImp->m_executorIdPool.size()); |
| } |
| |
| |
| |
| ExecutorId::~ExecutorId() |
| { |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| if (m_dedicatedThreads) { |
| m_persistentThreads = 0; |
| refreshDedicatedList(); |
| } |
| |
| globalImp->m_executorIdPool.release(m_id); |
| } |
| |
| |
| |
| |
| |
| void ExecutorId::refreshDedicatedList() |
| { |
| |
| |
| if (!m_dedicatedThreads || !m_persistentThreads) { |
| |
| |
| |
| unsigned int i, size = m_sleepings.size(); |
| for (i = 0; i < size; ++i) { |
| m_sleepings[i]->m_exit = true; |
| m_sleepings[i]->m_waitCondition.wakeOne(); |
| } |
| |
| m_sleepings.clear(); |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| Worker::Worker() |
| : QThread(), m_task(0), m_master(0), m_exit(true) |
| { |
| } |
| |
| |
| |
| Worker::~Worker() |
| { |
| } |
| |
| |
| |
| void Worker::run() |
| { |
| |
| QMutexLocker sl(&globalImp->m_transitionMutex); |
| |
| if (shutdownVar) |
| return; |
| |
| for (;;) { |
| |
| setPriority(m_task->runningPriority()); |
| |
| try { |
| Q_EMIT m_task->started(m_task); |
| sl.unlock(); |
| |
| m_task->run(); |
| |
| sl.relock(); |
| Q_EMIT m_task->finished(m_task); |
| } catch (...) { |
| sl.relock(); |
| Q_EMIT m_task->exception(m_task); |
| } |
| |
| updateCountsOnRelease(); |
| |
| if (shutdownVar) |
| return; |
| |
| |
| takeTask(); |
| |
| if (!m_task) { |
| onFinish(); |
| |
| if (!m_exit && !shutdownVar) { |
| |
| m_waitCondition.wait(sl.mutex()); |
| |
| |
| |
| if (!m_task || shutdownVar) |
| return; |
| } else |
| return; |
| } |
| } |
| } |
| |
| |
| |
| inline void Worker::updateCountsOnTake() |
| { |
| globalImp->m_activeLoad += m_task->m_load; |
| m_task->m_id->m_activeLoad += m_task->m_load; |
| ++m_task->m_id->m_activeTasks; |
| } |
| |
| |
| |
| inline void Worker::updateCountsOnRelease() |
| { |
| globalImp->m_activeLoad -= m_task->m_load; |
| m_task->m_id->m_activeLoad -= m_task->m_load; |
| --m_task->m_id->m_activeTasks; |
| } |
| |
| |
| |
| inline void Worker::onFinish() |
| { |
| if (m_master && m_master->m_dedicatedThreads && m_master->m_persistentThreads) { |
| m_exit = false; |
| m_master->m_sleepings.push_back(this); |
| |
| |
| globalImp->m_transitionMutex.unlock(); |
| |
| m_master = 0; |
| |
| |
| globalImp->m_transitionMutex.lock(); |
| } else { |
| m_exit = true; |
| globalImp->m_workers.erase(this); |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| Executor::Executor() |
| : m_id(new ExecutorId) |
| { |
| m_id->addRef(); |
| } |
| |
| |
| |
| Executor::~Executor() |
| { |
| m_id->release(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| void Executor::init() |
| { |
| |
| |
| |
| if (!globalImp) { |
| globalImp = new ExecutorImp; |
| globalImpSlots = new ExecutorImpSlots; |
| } |
| |
| qRegisterMetaType<TThread::RunnableP>("TThread::RunnableP"); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| void Executor::shutdown() |
| { |
| { |
| |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| shutdownVar = true; |
| |
| |
| std::set<Worker *>::iterator it; |
| for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) { |
| RunnableP task = (*it)->m_task; |
| if (task) |
| Q_EMIT task->canceled(task); |
| } |
| |
| |
| QMutableMapIterator<int, RunnableP> jt(globalImp->m_tasks); |
| while (jt.hasNext()) { |
| jt.next(); |
| RunnableP task = jt.value(); |
| Q_EMIT task->canceled(task); |
| jt.remove(); |
| } |
| |
| |
| for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) { |
| RunnableP task = (*it)->m_task; |
| if (task) |
| Q_EMIT task->terminated(task); |
| } |
| } |
| |
| |
| |
| QCoreApplication::processEvents(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| void Executor::setDedicatedThreads(bool dedicated, bool persistent) |
| { |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| m_id->m_dedicatedThreads = dedicated; |
| m_id->m_persistentThreads = persistent; |
| |
| m_id->refreshDedicatedList(); |
| } |
| |
| |
| |
| |
| |
| void Executor::addTask(RunnableP task) |
| { |
| { |
| if (task->m_id) |
| task->m_id->release(); |
| |
| |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| task->m_id = m_id; |
| m_id->addRef(); |
| |
| globalImp->insertTask(task->schedulingPriority(), task); |
| } |
| |
| |
| |
| globalImpSlots->emitRefreshAssignments(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| void Executor::removeTask(RunnableP task) |
| { |
| |
| if (task->m_id != m_id) |
| return; |
| |
| |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| |
| |
| if (globalImp->m_tasks.remove(task->m_schedulingPriority, task)) { |
| Q_EMIT task->canceled(task); |
| return; |
| } |
| |
| |
| std::set<Worker *> &workers = globalImp->m_workers; |
| std::set<Worker *>::iterator it; |
| for (it = workers.begin(); it != workers.end(); ++it) |
| if (task && (*it)->m_task == task) |
| Q_EMIT task->canceled(task); |
| |
| |
| } |
| |
| |
| |
| |
| |
| |
| |
| void Executor::cancelAll() |
| { |
| |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| |
| |
| std::set<Worker *>::iterator it; |
| for (it = globalImp->m_workers.begin(); it != globalImp->m_workers.end(); ++it) { |
| RunnableP task = (*it)->m_task; |
| if (task && task->m_id == m_id) |
| Q_EMIT task->canceled(task); |
| } |
| |
| |
| |
| QMutableMapIterator<int, RunnableP> jt(globalImp->m_tasks); |
| while (jt.hasNext()) { |
| jt.next(); |
| if (jt.value()->m_id == m_id) { |
| RunnableP task = jt.value(); |
| Q_EMIT task->canceled(task); |
| jt.remove(); |
| } |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| void Executor::setMaxActiveTasks(int maxActiveTasks) |
| { |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| if (maxActiveTasks <= 0) |
| m_id->m_maxActiveTasks = (std::numeric_limits<int>::max)(); |
| else |
| m_id->m_maxActiveTasks = maxActiveTasks; |
| } |
| |
| |
| |
| int Executor::maxActiveTasks() const |
| { |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| return m_id->m_maxActiveTasks; |
| } |
| |
| |
| |
| |
| |
| void Executor::setMaxActiveLoad(int maxActiveLoad) |
| { |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| m_id->m_maxActiveLoad = maxActiveLoad; |
| } |
| |
| |
| |
| int Executor::maxActiveLoad() const |
| { |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| return m_id->m_maxActiveLoad; |
| } |
| |
| |
| |
| |
| |
| |
| |
| ExecutorImpSlots::ExecutorImpSlots() |
| { |
| connect(this, SIGNAL(refreshAssignments()), this, SLOT(onRefreshAssignments())); |
| } |
| |
| |
| |
| ExecutorImpSlots::~ExecutorImpSlots() |
| { |
| } |
| |
| |
| |
| void ExecutorImpSlots::emitRefreshAssignments() |
| { |
| Q_EMIT refreshAssignments(); |
| } |
| |
| |
| |
| void ExecutorImpSlots::onRefreshAssignments() |
| { |
| QMutexLocker transitionLocker(&globalImp->m_transitionMutex); |
| |
| globalImp->refreshAssignments(); |
| } |
| |
| |
| |
| void ExecutorImpSlots::onTerminated() |
| { |
| delete QObject::sender(); |
| } |
| |
| |
| |
| |
| |
| inline void ExecutorId::newWorker(RunnableP &task) |
| { |
| Worker *worker; |
| |
| if (m_sleepings.size()) { |
| worker = m_sleepings.front(); |
| m_sleepings.pop_front(); |
| worker->m_task = task; |
| worker->updateCountsOnTake(); |
| worker->m_waitCondition.wakeOne(); |
| } else { |
| worker = new Worker; |
| globalImp->m_workers.insert(worker); |
| QObject::connect(worker, SIGNAL(finished()), globalImpSlots, SLOT(onTerminated())); |
| worker->m_task = task; |
| worker->updateCountsOnTake(); |
| worker->start(); |
| } |
| } |
| |
| |
| |
| inline void Worker::adoptTask(RunnableP &task) |
| { |
| m_task = task; |
| updateCountsOnTake(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| void ExecutorImp::refreshAssignments() |
| { |
| |
| |
| if (m_tasks.isEmpty()) |
| return; |
| |
| |
| assert(m_executorIdPool.size() == m_waitingFlagsPool.size()); |
| memset(&m_waitingFlagsPool.front(), 0, m_waitingFlagsPool.size()); |
| |
| |
| int e, executorsCount = m_executorIdPool.acquiredSize(); |
| |
| int i, tasksCount = m_tasks.size(); |
| QMultiMap<int, RunnableP>::iterator it; |
| for (i = 0, e = 0, it = m_tasks.end() - 1; i < tasksCount && e < executorsCount; ++i, --it) { |
| |
| |
| RunnableP task = it.value(); |
| task->m_load = task->taskLoad(); |
| |
| UCHAR &idWaitingForAnotherTask = m_waitingFlagsPool[task->m_id->m_id]; |
| if (idWaitingForAnotherTask) |
| continue; |
| |
| if (!isExecutable(task)) |
| break; |
| |
| if (!task->customConditions()) { |
| ++e; |
| idWaitingForAnotherTask = 1; |
| } else { |
| task->m_id->newWorker(task); |
| it = m_tasks.erase(it); |
| } |
| } |
| } |
| |
| |
| |
| inline bool Worker::canAdopt(const RunnableP &task) |
| { |
| return task->m_id->m_sleepings.size() == 0 && |
| (!m_master || (m_master.getPointer() == task->m_id)); |
| } |
| |
| |
| |
| |
| inline void Worker::takeTask() |
| { |
| TSmartPointerT<ExecutorId> oldId = m_task->m_id; |
| |
| |
| m_master = oldId->m_dedicatedThreads ? oldId : (TSmartPointerT<ExecutorId>)0; |
| |
| |
| |
| |
| |
| |
| |
| globalImp->m_transitionMutex.unlock(); |
| |
| m_task = 0; |
| oldId = TSmartPointerT<ExecutorId>(); |
| |
| globalImp->m_transitionMutex.lock(); |
| |
| |
| tcg::indices_pool<> &executorIdPool = globalImp->m_executorIdPool; |
| std::vector<UCHAR> &waitingFlagsPool = globalImp->m_waitingFlagsPool; |
| |
| assert(waitingFlagsPool.size() == globalImp->m_executorIdPool.size()); |
| memset(&waitingFlagsPool.front(), 0, waitingFlagsPool.size()); |
| |
| int e, executorsCount = executorIdPool.acquiredSize(); |
| |
| int i, tasksCount = globalImp->m_tasks.size(); |
| QMultiMap<int, RunnableP>::iterator it; |
| for (i = 0, e = 0, it = globalImp->m_tasks.end() - 1; i < tasksCount && e < executorsCount; ++i, --it) { |
| |
| |
| RunnableP task = it.value(); |
| task->m_load = task->taskLoad(); |
| |
| UCHAR &idWaitingForAnotherTask = waitingFlagsPool[task->m_id->m_id]; |
| if (idWaitingForAnotherTask) |
| continue; |
| |
| if (!globalImp->isExecutable(task)) |
| break; |
| |
| |
| if (!canAdopt(task)) { |
| |
| globalImpSlots->emitRefreshAssignments(); |
| break; |
| } |
| |
| |
| if (!task->customConditions()) { |
| ++e; |
| idWaitingForAnotherTask = 1; |
| } else { |
| adoptTask(task); |
| it = globalImp->m_tasks.erase(it); |
| |
| globalImpSlots->emitRefreshAssignments(); |
| break; |
| } |
| } |
| } |
| |