| |
| |
| #include "tthread.h" |
| #include "tthreadP.h" |
| |
| #include "boost/thread/thread.hpp" |
| #include "boost/thread/condition.hpp" |
| #include "boost/thread/tss.hpp" |
| #include "boost/thread/xtime.hpp" |
| #include <queue> |
| |
| #ifdef WIN32 |
| #include <windows.h> |
| #define WM_THREAD_NOTIFICATION (WM_USER + 10) |
| #else |
| |
| #endif |
| |
| DEFINE_CLASS_CODE(TThread::Runnable, 21) |
| |
| |
| |
| namespace |
| { |
| #ifdef WIN32 |
| HWND MainHandle; |
| #else |
| Display *TheDisplay; |
| Window TheMainWindow; |
| #endif |
| |
| |
| |
| class Thread |
| { |
| public: |
| Thread(); |
| Thread(const TThread::RunnableP &runnable); |
| |
| ~Thread(); |
| |
| void join(); |
| void cancel(); |
| |
| #if defined(WIN32) && (_MSC_VER == 1200) |
| #pragma warning(disable : 4290) |
| #endif |
| |
| static void milestone() throw(TThread::Interrupt); |
| |
| #if defined(WIN32) && (_MSC_VER == 1200) |
| #pragma warning(default : 4290) |
| #endif |
| |
| class Imp; |
| |
| private: |
| friend class ThreadGroup; |
| Imp *m_imp; |
| }; |
| |
| |
| |
| class ThreadGroup |
| { |
| public: |
| ThreadGroup(); |
| ~ThreadGroup(); |
| |
| void add(Thread *thread); |
| void joinAll(); |
| |
| private: |
| class Imp; |
| Imp *m_imp; |
| }; |
| |
| |
| |
| template <class T> |
| class QueueT |
| { |
| public: |
| QueueT(int slotCount) |
| : m_items(), m_slotCount(slotCount), m_notEmpty(), m_notFull(), m_mutex() |
| { |
| } |
| |
| ~QueueT() {} |
| |
| void put(const T &item) |
| { |
| TThread::ScopedLock sl(m_mutex); |
| while (m_items.size() == m_slotCount) |
| m_notFull.wait(sl); |
| m_items.push(item); |
| m_notEmpty.notify_one(); |
| } |
| |
| T get() |
| { |
| TThread::ScopedLock sl(m_mutex); |
| |
| while (m_items.size() == 0) |
| m_notEmpty.wait(sl); |
| |
| m_notFull.notify_one(); |
| T item = m_items.front(); |
| m_items.pop(); |
| return item; |
| } |
| |
| int size() |
| { |
| TThread::ScopedLock sl(m_mutex); |
| int size = m_items.size(); |
| return size; |
| } |
| |
| private: |
| std::queue<T> m_items; |
| TThread::Condition m_notEmpty; |
| TThread::Condition m_notFull; |
| TThread::Mutex m_mutex; |
| int m_slotCount; |
| }; |
| |
| } |
| |
| |
| |
| void TThread::setMainThreadId(TThread::ThreadInfo *info) |
| { |
| assert(info); |
| #ifdef WIN32 |
| MainHandle = info->mainHandle; |
| #else |
| TheDisplay = info->dpy; |
| TheMainWindow = info->win; |
| #endif |
| } |
| |
| |
| |
| #ifdef WIN32 |
| ULONG TThread::getMainShellHandle() |
| { |
| return ULONG(MainHandle); |
| } |
| #endif |
| |
| |
| |
| class BoostRunnable |
| { |
| public: |
| BoostRunnable(const TThread::RunnableP &runnable, Thread::Imp *threadImp) |
| : m_runnable(runnable), m_threadImp(threadImp) {} |
| |
| void operator()(); |
| |
| TThread::RunnableP m_runnable; |
| Thread::Imp *m_threadImp; |
| }; |
| |
| |
| |
| class Thread::Imp |
| { |
| public: |
| Imp() : m_boostThread(0), m_isCanceled(false), m_stateMutex() {} |
| |
| Imp(const TThread::RunnableP &runnable) |
| : m_isCanceled(false), m_stateMutex() |
| { |
| m_boostThread = new boost::thread(BoostRunnable(runnable, this)); |
| } |
| |
| ~Imp() |
| { |
| if (m_boostThread) |
| delete m_boostThread; |
| } |
| |
| boost::thread *m_boostThread; |
| boost::mutex m_stateMutex; |
| bool m_isCanceled; |
| long m_kkkk; |
| |
| enum State { |
| Running, |
| Canceled |
| }; |
| |
| static boost::mutex m_mutex; |
| static std::map<long, State> m_state; |
| |
| class Key |
| { |
| public: |
| Key(long id) : m_id(id) {} |
| long m_id; |
| }; |
| |
| static boost::thread_specific_ptr<Key> m_key; |
| }; |
| |
| boost::mutex Thread::Imp::m_mutex; |
| std::map<long, Thread::Imp::State> Thread::Imp::m_state; |
| |
| boost::thread_specific_ptr<Thread::Imp::Key> Thread::Imp::m_key; |
| |
| void BoostRunnable::operator()() |
| { |
| Thread::Imp::m_key.reset(new Thread::Imp::Key(reinterpret_cast<long>(this))); |
| m_threadImp->m_kkkk = reinterpret_cast<long>(this); |
| |
| { |
| boost::mutex::scoped_lock sl(Thread::Imp::m_mutex); |
| Thread::Imp::m_state[m_threadImp->m_kkkk] = Thread::Imp::Running; |
| } |
| |
| assert(m_runnable); |
| m_runnable->run(); |
| |
| { |
| boost::mutex::scoped_lock sl(Thread::Imp::m_mutex); |
| Thread::Imp::m_state.erase(reinterpret_cast<long>(this)); |
| } |
| } |
| |
| |
| |
| Thread::Thread() : m_imp(new Thread::Imp) |
| { |
| } |
| |
| |
| |
| Thread::Thread(const TThread::RunnableP &runnable) |
| : m_imp(new Imp(runnable)) |
| { |
| } |
| |
| |
| |
| Thread::~Thread() |
| { |
| delete m_imp; |
| } |
| |
| |
| |
| void Thread::join() |
| { |
| assert(m_imp->m_boostThread); |
| m_imp->m_boostThread->join(); |
| } |
| |
| |
| |
| void Thread::cancel() |
| { |
| boost::mutex::scoped_lock sl(Thread::Imp::m_mutex); |
| |
| std::map<long, Thread::Imp::State>::iterator it = Thread::Imp::m_state.find(m_imp->m_kkkk); |
| if (it != Thread::Imp::m_state.end()) |
| Thread::Imp::m_state[m_imp->m_kkkk] = Thread::Imp::Canceled; |
| } |
| |
| |
| |
| #if defined(WIN32) && (_MSC_VER == 1200) |
| #pragma warning(disable : 4290) |
| #endif |
| |
| void Thread::milestone() throw(TThread::Interrupt) |
| { |
| boost::mutex::scoped_lock sl(Thread::Imp::m_mutex); |
| Thread::Imp::Key key = *Thread::Imp::m_key.get(); |
| Thread::Imp::m_state.find(key.m_id); |
| |
| if (Thread::Imp::m_state[key.m_id]) |
| throw TThread::Interrupt(); |
| } |
| |
| #if defined(WIN32) && (_MSC_VER == 1200) |
| #pragma warning(default : 4290) |
| #endif |
| |
| |
| |
| class ThreadGroup::Imp |
| { |
| public: |
| Imp() : m_boostThreadGroup() {} |
| ~Imp() {} |
| |
| boost::thread_group m_boostThreadGroup; |
| }; |
| |
| ThreadGroup::ThreadGroup() : m_imp(new Imp) |
| { |
| } |
| |
| ThreadGroup::~ThreadGroup() |
| { |
| delete m_imp; |
| } |
| |
| void ThreadGroup::add(Thread *thread) |
| { |
| m_imp->m_boostThreadGroup.add_thread(thread->m_imp->m_boostThread); |
| } |
| |
| void ThreadGroup::joinAll() |
| { |
| m_imp->m_boostThreadGroup.join_all(); |
| } |
| |
| |
| |
| #if defined(WIN32) && (_MSC_VER == 1200) |
| #pragma warning(disable : 4290) |
| #endif |
| |
| void TThread::milestone() throw(TThread::Interrupt) |
| { |
| Thread::milestone(); |
| } |
| |
| #if defined(WIN32) && (_MSC_VER == 1200) |
| #pragma warning(default : 4290) |
| #endif |
| |
| |
| |
| class TThread::Mutex::Imp |
| { |
| public: |
| boost::mutex m_mutex; |
| |
| Imp() : m_mutex() {} |
| ~Imp() {} |
| }; |
| |
| TThread::Mutex::Mutex() : m_imp(new Imp) |
| { |
| } |
| |
| TThread::Mutex::~Mutex() |
| { |
| delete m_imp; |
| } |
| |
| |
| |
| class TThread::ScopedLock::Imp |
| { |
| public: |
| boost::mutex::scoped_lock *m_sl; |
| |
| Imp(boost::mutex &mutex) : m_sl(new boost::mutex::scoped_lock(mutex)) {} |
| |
| ~Imp() |
| { |
| m_sl->unlock(); |
| delete m_sl; |
| } |
| }; |
| |
| TThread::ScopedLock::ScopedLock(Mutex &mutex) : m_imp(new Imp(mutex.m_imp->m_mutex)) {} |
| |
| TThread::ScopedLock::~ScopedLock() |
| { |
| delete m_imp; |
| } |
| |
| |
| |
| class TThread::Condition::Imp |
| { |
| public: |
| boost::condition m_condition; |
| |
| Imp() : m_condition() {} |
| ~Imp() {} |
| }; |
| |
| TThread::Condition::Condition() |
| : m_imp(new Imp()) {} |
| |
| TThread::Condition::~Condition() |
| { |
| delete m_imp; |
| } |
| |
| void TThread::Condition::wait(ScopedLock &lock) |
| { |
| m_imp->m_condition.wait(*(lock.m_imp->m_sl)); |
| } |
| |
| bool TThread::Condition::wait(ScopedLock &lock, long timeout) |
| { |
| boost::xtime xt; |
| boost::xtime_get(&xt, boost::TIME_UTC); |
| xt.nsec += timeout * 1000; |
| xt.sec += timeout / 1000; |
| return m_imp->m_condition.timed_wait(*(lock.m_imp->m_sl), xt); |
| } |
| |
| void TThread::Condition::notifyOne() |
| { |
| m_imp->m_condition.notify_one(); |
| } |
| |
| void TThread::Condition::notifyAll() |
| { |
| m_imp->m_condition.notify_all(); |
| } |
| |
| |
| |
| TThread::Msg::Msg() |
| { |
| } |
| |
| |
| |
| void TThread::Msg::send() |
| { |
| Msg *msg = clone(); |
| |
| #ifdef WIN32 |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| PostMessage(HWND(getMainShellHandle()), WM_THREAD_NOTIFICATION, WPARAM(msg), 0); |
| #else |
| XClientMessageEvent clientMsg; |
| clientMsg.type = ClientMessage; |
| clientMsg.window = TheMainWindow; |
| clientMsg.format = 32; |
| clientMsg.message_type = Msg::MsgId(); |
| clientMsg.data.l[0] = (long)msg; |
| |
| XSendEvent(TheDisplay, TheMainWindow, 0, NoEventMask, (XEvent *)&clientMsg); |
| XFlush(TheDisplay); |
| #endif |
| } |
| |
| |
| |
| |
| UINT TThread::Msg::MsgId() |
| { |
| #ifdef WIN32 |
| return WM_THREAD_NOTIFICATION; |
| #else |
| static Atom atom = 0; |
| if (!atom) { |
| atom = XInternAtom(TheDisplay, "ThreadMessage", false); |
| assert(atom); |
| } |
| return atom; |
| #endif |
| } |
| |
| |
| |
| class TThread::Executor::Imp : public TSmartObject |
| { |
| public: |
| typedef TSmartPointerT<TThread::Executor::Imp> ImpP; |
| |
| |
| |
| class Worker : public Runnable |
| { |
| public: |
| Worker(ImpP owner) : Runnable(), m_owner(owner) {} |
| ~Worker() {} |
| |
| void run(); |
| void doCleanup(); |
| |
| ImpP m_owner; |
| }; |
| |
| |
| |
| bool m_suspend; |
| bool m_threadHasToDie; |
| |
| UINT m_threadCount; |
| Mutex m_mutex; |
| |
| Condition m_cond; |
| Condition m_taskQueueEmpty; |
| |
| |
| Condition m_taskQueueNotEmpty; |
| |
| |
| std::queue<TThread::RunnableP> m_tasks; |
| std::map<long, Thread *> m_workerThreads; |
| |
| Imp(int threadsCount, bool suspend) |
| : TSmartObject(), m_suspend(suspend), m_threadHasToDie(false), m_threadCount(threadsCount), m_tasks(), m_workerThreads(), m_mutex(), m_cond(){}; |
| |
| ~Imp() |
| { |
| } |
| }; |
| |
| |
| |
| TThread::Executor::Executor(int threadsCount, bool suspend) : m_imp(new Imp(threadsCount, suspend)) |
| { |
| m_imp->addRef(); |
| } |
| |
| |
| |
| TThread::Executor::~Executor() |
| { |
| { |
| TThread::ScopedLock sl(m_imp->m_mutex); |
| |
| if (m_imp->m_suspend) { |
| m_imp->m_threadHasToDie = true; |
| m_imp->m_taskQueueNotEmpty.notifyAll(); |
| } |
| } |
| |
| m_imp->release(); |
| } |
| |
| |
| |
| void TThread::Executor::Imp::Worker::run() |
| { |
| try { |
| while (true) { |
| |
| Thread::milestone(); |
| |
| |
| RunnableP task = 0; |
| |
| { |
| ScopedLock sl(m_owner->m_mutex); |
| if (m_owner->m_tasks.empty()) { |
| |
| if (m_owner->m_suspend) { |
| |
| m_owner->m_taskQueueNotEmpty.wait(sl); |
| |
| |
| if (m_owner->m_threadHasToDie) { |
| doCleanup(); |
| return; |
| } |
| } else { |
| |
| doCleanup(); |
| return; |
| } |
| } |
| |
| if (!m_owner->m_tasks.empty()) { |
| task = m_owner->m_tasks.front(); |
| m_owner->m_tasks.pop(); |
| } |
| } |
| |
| if (task) |
| task->run(); |
| |
| |
| Thread::milestone(); |
| } |
| } catch (TThread::Interrupt &) { |
| |
| } catch (...) { |
| |
| |
| |
| |
| ScopedLock sl(m_owner->m_mutex); |
| doCleanup(); |
| } |
| } |
| |
| |
| |
| void TThread::Executor::Imp::Worker::doCleanup() |
| { |
| std::map<long, Thread *>::iterator it = m_owner->m_workerThreads.find(reinterpret_cast<long>(this)); |
| |
| if (it != m_owner->m_workerThreads.end()) { |
| Thread *thread = it->second; |
| delete thread; |
| m_owner->m_workerThreads.erase(it); |
| } |
| |
| if (m_owner->m_workerThreads.size() == 0) |
| m_owner->m_taskQueueEmpty.notifyAll(); |
| } |
| |
| |
| |
| void TThread::Executor::addTask(const RunnableP &task) |
| { |
| TThread::ScopedLock sl(m_imp->m_mutex); |
| m_imp->m_tasks.push(task); |
| if (m_imp->m_workerThreads.size() < m_imp->m_threadCount) { |
| TThread::Executor::Imp::Worker *worker = |
| new TThread::Executor::Imp::Worker(m_imp); |
| |
| m_imp->m_workerThreads[reinterpret_cast<long>(worker)] = new Thread(worker); |
| } else { |
| if (m_imp->m_suspend) |
| |
| m_imp->m_taskQueueNotEmpty.notifyOne(); |
| } |
| } |
| |
| |
| |
| void TThread::Executor::clear() |
| { |
| ScopedLock sl(m_imp->m_mutex); |
| |
| while (!m_imp->m_tasks.empty()) |
| m_imp->m_tasks.pop(); |
| } |
| |
| |
| |
| void TThread::Executor::cancel() |
| { |
| { |
| ScopedLock sl(m_imp->m_mutex); |
| |
| while (!m_imp->m_tasks.empty()) |
| m_imp->m_tasks.pop(); |
| } |
| |
| while (true) { |
| Thread *thread = 0; |
| { |
| ScopedLock sl(m_imp->m_mutex); |
| if (m_imp->m_workerThreads.empty()) |
| break; |
| else { |
| std::map<long, Thread *>::iterator it = m_imp->m_workerThreads.begin(); |
| thread = it->second; |
| m_imp->m_workerThreads.erase(it); |
| |
| if (thread) |
| thread->cancel(); |
| } |
| } |
| } |
| } |
| |
| |
| |
| void TThread::Executor::wait() |
| { |
| TThread::ScopedLock sl(m_imp->m_mutex); |
| |
| while (m_imp->m_workerThreads.size()) |
| m_imp->m_taskQueueEmpty.wait(sl); |
| } |
| |
| |
| |
| bool TThread::Executor::wait(long timeout) |
| { |
| TThread::ScopedLock sl(m_imp->m_mutex); |
| |
| bool expired = false; |
| while (m_imp->m_workerThreads.size()) |
| expired = m_imp->m_taskQueueEmpty.wait(sl, timeout); |
| |
| return expired; |
| } |
| |
| |
| |
| int TThread::Executor::getThreadCount() |
| { |
| TThread::ScopedLock sl(m_imp->m_mutex); |
| return m_imp->m_workerThreads.size(); |
| } |
| |
| |
| |
| int TThread::Executor::getTaskCount() |
| { |
| TThread::ScopedLock sl(m_imp->m_mutex); |
| return m_imp->m_tasks.size(); |
| } |
| |