Blob Blame Raw


#include "tthread.h"
#include "tthreadP.h"

#include "boost/thread/thread.hpp"
#include "boost/thread/condition.hpp"
#include "boost/thread/tss.hpp"
#include "boost/thread/xtime.hpp"
#include <queue>

#ifdef WIN32
#include <windows.h>
#define WM_THREAD_NOTIFICATION (WM_USER + 10)
#else

#endif

DEFINE_CLASS_CODE(TThread::Runnable, 21)

//==============================================================================

namespace
{
#ifdef WIN32
HWND MainHandle;
#else
Display *TheDisplay;
Window TheMainWindow;
#endif

//------------------------------------------------------------------------------

class Thread
{
public:
	Thread();
	Thread(const TThread::RunnableP &runnable);

	~Thread();

	void join();
	void cancel();

#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(disable : 4290)
#endif

	static void milestone() throw(TThread::Interrupt);

#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(default : 4290)
#endif

	class Imp;

private:
	friend class ThreadGroup;
	Imp *m_imp;
};

//------------------------------------------------------------------------------

class ThreadGroup
{
public:
	ThreadGroup();
	~ThreadGroup();

	void add(Thread *thread);
	void joinAll();

private:
	class Imp;
	Imp *m_imp;
};

//------------------------------------------------------------------------------

template <class T>
class QueueT
{
public:
	QueueT(int slotCount)
		: m_items(), m_slotCount(slotCount), m_notEmpty(), m_notFull(), m_mutex()
	{
	}

	~QueueT() {}

	void put(const T &item)
	{
		TThread::ScopedLock sl(m_mutex);
		while (m_items.size() == m_slotCount)
			m_notFull.wait(sl);
		m_items.push(item);
		m_notEmpty.notify_one();
	}

	T get()
	{
		TThread::ScopedLock sl(m_mutex);

		while (m_items.size() == 0)
			m_notEmpty.wait(sl);

		m_notFull.notify_one();
		T item = m_items.front();
		m_items.pop();
		return item;
	}

	int size()
	{
		TThread::ScopedLock sl(m_mutex);
		int size = m_items.size();
		return size;
	}

private:
	std::queue<T> m_items;
	TThread::Condition m_notEmpty;
	TThread::Condition m_notFull;
	TThread::Mutex m_mutex;
	int m_slotCount;
};

} // anonymous namespace

//------------------------------------------------------------------------------

void TThread::setMainThreadId(TThread::ThreadInfo *info)
{
	assert(info);
#ifdef WIN32
	MainHandle = info->mainHandle;
#else
	TheDisplay = info->dpy;
	TheMainWindow = info->win;
#endif
}

//------------------------------------------------------------------------------

#ifdef WIN32
ULONG TThread::getMainShellHandle()
{
	return ULONG(MainHandle);
}
#endif

//==============================================================================

class BoostRunnable
{
public:
	BoostRunnable(const TThread::RunnableP &runnable, Thread::Imp *threadImp)
		: m_runnable(runnable), m_threadImp(threadImp) {}

	void operator()();

	TThread::RunnableP m_runnable;
	Thread::Imp *m_threadImp;
};

//==============================================================================

class Thread::Imp
{
public:
	Imp() : m_boostThread(0), m_isCanceled(false), m_stateMutex() {}

	Imp(const TThread::RunnableP &runnable)
		: m_isCanceled(false), m_stateMutex()
	{
		m_boostThread = new boost::thread(BoostRunnable(runnable, this));
	}

	~Imp()
	{
		if (m_boostThread)
			delete m_boostThread;
	}

	boost::thread *m_boostThread;
	boost::mutex m_stateMutex;
	bool m_isCanceled;
	long m_kkkk;

	enum State {
		Running,
		Canceled
	};

	static boost::mutex m_mutex;
	static std::map<long, State> m_state;

	class Key
	{
	public:
		Key(long id) : m_id(id) {}
		long m_id;
	};

	static boost::thread_specific_ptr<Key> m_key;
};

boost::mutex Thread::Imp::m_mutex;
std::map<long, Thread::Imp::State> Thread::Imp::m_state;

boost::thread_specific_ptr<Thread::Imp::Key> Thread::Imp::m_key;

void BoostRunnable::operator()()
{
	Thread::Imp::m_key.reset(new Thread::Imp::Key(reinterpret_cast<long>(this)));
	m_threadImp->m_kkkk = reinterpret_cast<long>(this);

	{
		boost::mutex::scoped_lock sl(Thread::Imp::m_mutex);
		Thread::Imp::m_state[m_threadImp->m_kkkk] = Thread::Imp::Running;
	}

	assert(m_runnable);
	m_runnable->run();

	{
		boost::mutex::scoped_lock sl(Thread::Imp::m_mutex);
		Thread::Imp::m_state.erase(reinterpret_cast<long>(this));
	}
}

//==============================================================================

Thread::Thread() : m_imp(new Thread::Imp)
{
}

//------------------------------------------------------------------------------

Thread::Thread(const TThread::RunnableP &runnable)
	: m_imp(new Imp(runnable))
{
}

//------------------------------------------------------------------------------

Thread::~Thread()
{
	delete m_imp;
}

//------------------------------------------------------------------------------

void Thread::join()
{
	assert(m_imp->m_boostThread);
	m_imp->m_boostThread->join();
}

//------------------------------------------------------------------------------

void Thread::cancel()
{
	boost::mutex::scoped_lock sl(Thread::Imp::m_mutex);

	std::map<long, Thread::Imp::State>::iterator it = Thread::Imp::m_state.find(m_imp->m_kkkk);
	if (it != Thread::Imp::m_state.end())
		Thread::Imp::m_state[m_imp->m_kkkk] = Thread::Imp::Canceled;
}

//------------------------------------------------------------------------------
// static member function
#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(disable : 4290)
#endif

void Thread::milestone() throw(TThread::Interrupt)
{
	boost::mutex::scoped_lock sl(Thread::Imp::m_mutex);
	Thread::Imp::Key key = *Thread::Imp::m_key.get();
	Thread::Imp::m_state.find(key.m_id);

	if (Thread::Imp::m_state[key.m_id])
		throw TThread::Interrupt();
}

#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(default : 4290)
#endif

//==============================================================================

class ThreadGroup::Imp
{
public:
	Imp() : m_boostThreadGroup() {}
	~Imp() {}

	boost::thread_group m_boostThreadGroup;
};

ThreadGroup::ThreadGroup() : m_imp(new Imp)
{
}

ThreadGroup::~ThreadGroup()
{
	delete m_imp;
}

void ThreadGroup::add(Thread *thread)
{
	m_imp->m_boostThreadGroup.add_thread(thread->m_imp->m_boostThread);
}

void ThreadGroup::joinAll()
{
	m_imp->m_boostThreadGroup.join_all();
}

//==============================================================================

#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(disable : 4290)
#endif

void TThread::milestone() throw(TThread::Interrupt)
{
	Thread::milestone();
}

#if defined(WIN32) && (_MSC_VER == 1200)
#pragma warning(default : 4290)
#endif

//==============================================================================

class TThread::Mutex::Imp
{
public:
	boost::mutex m_mutex;

	Imp() : m_mutex() {}
	~Imp() {}
};

TThread::Mutex::Mutex() : m_imp(new Imp)
{
}

TThread::Mutex::~Mutex()
{
	delete m_imp;
}

//==============================================================================

class TThread::ScopedLock::Imp
{
public:
	boost::mutex::scoped_lock *m_sl;

	Imp(boost::mutex &mutex) : m_sl(new boost::mutex::scoped_lock(mutex)) {}

	~Imp()
	{
		m_sl->unlock();
		delete m_sl;
	}
};

TThread::ScopedLock::ScopedLock(Mutex &mutex) : m_imp(new Imp(mutex.m_imp->m_mutex)) {}

TThread::ScopedLock::~ScopedLock()
{
	delete m_imp;
}

//==============================================================================

class TThread::Condition::Imp
{
public:
	boost::condition m_condition;

	Imp() : m_condition() {}
	~Imp() {}
};

TThread::Condition::Condition()
	: m_imp(new Imp()) {}

TThread::Condition::~Condition()
{
	delete m_imp;
}

void TThread::Condition::wait(ScopedLock &lock)
{
	m_imp->m_condition.wait(*(lock.m_imp->m_sl));
}

bool TThread::Condition::wait(ScopedLock &lock, long timeout)
{
	boost::xtime xt;
	boost::xtime_get(&xt, boost::TIME_UTC);
	xt.nsec += timeout * 1000;
	xt.sec += timeout / 1000;
	return m_imp->m_condition.timed_wait(*(lock.m_imp->m_sl), xt);
}

void TThread::Condition::notifyOne()
{
	m_imp->m_condition.notify_one();
}

void TThread::Condition::notifyAll()
{
	m_imp->m_condition.notify_all();
}

//==============================================================================

TThread::Msg::Msg()
{
}

//------------------------------------------------------------------------------

void TThread::Msg::send()
{
	Msg *msg = clone();

#ifdef WIN32
	/*
Non viene utilizzato PostThreadMessage perche' se l'applicazione 
si trova in un modal loop (esempio MessageBox) oppure si sta 
facendo move o resize di una finestra i messaggi non giungono al
message loop.
http://support.microsoft.com/default.aspx?scid=KB;EN-US;q183116& 
*/

	/*
  BOOL rc = PostThreadMessage(
    getMainThreadId(),      // thread identifier
    WM_THREAD_NOTIFICATION, // message
    WPARAM(msg),            // first message parameter
    0);                     // second message parameter   
*/
	PostMessage(HWND(getMainShellHandle()), WM_THREAD_NOTIFICATION, WPARAM(msg), 0);
#else
	XClientMessageEvent clientMsg;
	clientMsg.type = ClientMessage;
	clientMsg.window = TheMainWindow;
	clientMsg.format = 32;
	clientMsg.message_type = Msg::MsgId();
	clientMsg.data.l[0] = (long)msg;
	//Status status =
	XSendEvent(TheDisplay, TheMainWindow, 0, NoEventMask, (XEvent *)&clientMsg);
	XFlush(TheDisplay);
#endif
}

//------------------------------------------------------------------------------

// statica
UINT TThread::Msg::MsgId()
{
#ifdef WIN32
	return WM_THREAD_NOTIFICATION;
#else
	static Atom atom = 0;
	if (!atom) {
		atom = XInternAtom(TheDisplay, "ThreadMessage", false);
		assert(atom);
	}
	return atom;
#endif
}

//==============================================================================

class TThread::Executor::Imp : public TSmartObject
{
public:
	typedef TSmartPointerT<TThread::Executor::Imp> ImpP;

	//---------------------------------------------------

	class Worker : public Runnable
	{
	public:
		Worker(ImpP owner) : Runnable(), m_owner(owner) {}
		~Worker() {}

		void run();
		void doCleanup();

		ImpP m_owner;
	};

	//---------------------------------------------------

	bool m_suspend;
	bool m_threadHasToDie;

	UINT m_threadCount;
	Mutex m_mutex;

	Condition m_cond;
	Condition m_taskQueueEmpty;

	//------
	Condition m_taskQueueNotEmpty;
	//------

	std::queue<TThread::RunnableP> m_tasks;
	std::map<long, Thread *> m_workerThreads;

	Imp(int threadsCount, bool suspend)
		: TSmartObject(), m_suspend(suspend), m_threadHasToDie(false), m_threadCount(threadsCount), m_tasks(), m_workerThreads(), m_mutex(), m_cond(){};

	~Imp()
	{
	}
};

//------------------------------------------------------------------------------

TThread::Executor::Executor(int threadsCount, bool suspend) : m_imp(new Imp(threadsCount, suspend))
{
	m_imp->addRef();
}

//------------------------------------------------------------------------------

TThread::Executor::~Executor()
{
	{
		TThread::ScopedLock sl(m_imp->m_mutex);

		if (m_imp->m_suspend) {
			m_imp->m_threadHasToDie = true;
			m_imp->m_taskQueueNotEmpty.notifyAll();
		}
	}

	m_imp->release();
}

//------------------------------------------------------------------------------

void TThread::Executor::Imp::Worker::run()
{
	try {
		while (true) {
			// check if thread has been canceled
			Thread::milestone();

			// get the next task
			RunnableP task = 0;

			{
				ScopedLock sl(m_owner->m_mutex);
				if (m_owner->m_tasks.empty()) {
					// la lista di task e' stata esaurita
					if (m_owner->m_suspend) {
						// il thread deve sospendersi
						m_owner->m_taskQueueNotEmpty.wait(sl);

						// a questo punto il thread e' stato risvegliato
						if (m_owner->m_threadHasToDie) {
							doCleanup();
							return;
						}
					} else {
						// il thread sta per morire -> bisogna eliminarlo dalla lista dei worker thread
						doCleanup();
						return;
					}
				}

				if (!m_owner->m_tasks.empty()) {
					task = m_owner->m_tasks.front();
					m_owner->m_tasks.pop();
				}
			}

			if (task)
				task->run();

			// check if thread has been canceled
			Thread::milestone();
		}
	} catch (TThread::Interrupt &) {
		//m_owner->m_cond.notifyOne();
	} catch (...) {

		// eccezione non prevista --> bisogna eliminare il thread
		// dalla lista dei worker thread

		ScopedLock sl(m_owner->m_mutex);
		doCleanup();
	}
}

//------------------------------------------------------------------------------

void TThread::Executor::Imp::Worker::doCleanup()
{
	std::map<long, Thread *>::iterator it = m_owner->m_workerThreads.find(reinterpret_cast<long>(this));

	if (it != m_owner->m_workerThreads.end()) {
		Thread *thread = it->second;
		delete thread;
		m_owner->m_workerThreads.erase(it);
	}

	if (m_owner->m_workerThreads.size() == 0)
		m_owner->m_taskQueueEmpty.notifyAll();
}

//------------------------------------------------------------------------------

void TThread::Executor::addTask(const RunnableP &task)
{
	TThread::ScopedLock sl(m_imp->m_mutex);
	m_imp->m_tasks.push(task);
	if (m_imp->m_workerThreads.size() < m_imp->m_threadCount) {
		TThread::Executor::Imp::Worker *worker =
			new TThread::Executor::Imp::Worker(m_imp);

		m_imp->m_workerThreads[reinterpret_cast<long>(worker)] = new Thread(worker);
	} else {
		if (m_imp->m_suspend)
			// risveglia uno dei thread in attesa
			m_imp->m_taskQueueNotEmpty.notifyOne();
	}
}

//------------------------------------------------------------------------------

void TThread::Executor::clear()
{
	ScopedLock sl(m_imp->m_mutex);

	while (!m_imp->m_tasks.empty())
		m_imp->m_tasks.pop();
}

//------------------------------------------------------------------------------

void TThread::Executor::cancel()
{
	{
		ScopedLock sl(m_imp->m_mutex);

		while (!m_imp->m_tasks.empty())
			m_imp->m_tasks.pop();
	}

	while (true) {
		Thread *thread = 0;
		{
			ScopedLock sl(m_imp->m_mutex);
			if (m_imp->m_workerThreads.empty())
				break;
			else {
				std::map<long, Thread *>::iterator it = m_imp->m_workerThreads.begin();
				thread = it->second;
				m_imp->m_workerThreads.erase(it);

				if (thread)
					thread->cancel();
			}
		}
	}
}

//------------------------------------------------------------------------------

void TThread::Executor::wait()
{
	TThread::ScopedLock sl(m_imp->m_mutex);

	while (m_imp->m_workerThreads.size())
		m_imp->m_taskQueueEmpty.wait(sl);
}

//------------------------------------------------------------------------------

bool TThread::Executor::wait(long timeout)
{
	TThread::ScopedLock sl(m_imp->m_mutex);

	bool expired = false;
	while (m_imp->m_workerThreads.size())
		expired = m_imp->m_taskQueueEmpty.wait(sl, timeout);

	return expired;
}

//------------------------------------------------------------------------------

int TThread::Executor::getThreadCount()
{
	TThread::ScopedLock sl(m_imp->m_mutex);
	return m_imp->m_workerThreads.size();
}

//------------------------------------------------------------------------------

int TThread::Executor::getTaskCount()
{
	TThread::ScopedLock sl(m_imp->m_mutex);
	return m_imp->m_tasks.size();
}