Blob Blame Raw

#include "threadpool.h"


ThreadPool::ThreadPool():
	lastId() { }


ThreadPool::~ThreadPool()
	{ stop(true); }


void ThreadPool::threadRun(Thread &thread) {
	Task task(nullptr);
	while(true) {
		{ // wait for new task
			std::unique_lock<std::mutex> lock(mutex);
			
			thread.taskId = 0;
			thread.taskOwner.reset();
			thread.condition.notify_all();
			
			while(queue.empty() && !thread.stopping)
				condition.wait(lock);
			if (thread.stopping)
				break;
			
			TaskDesc &desc = queue.front();
			thread.taskId = desc.id;
			thread.taskOwner = desc.owner;
			task = thread.taskOwner ? desc.task : nullptr;
			
			queue.pop_front();
		}
		
		if (task)
			task();
	}
}


ThreadPool::TaskId ThreadPool::enqueue(const Task &task, const TaskOwner::Handle &owner) {
	std::lock_guard<std::mutex> lock(mutex);
	queue.emplace_back(++lastId, TaskOwner::Handle::Weak(owner), task);
	condition.notify_one();
	return lastId;
}


int ThreadPool::cancelById(TaskId id, bool wait) {
	std::unique_lock<std::mutex> lock(mutex);
	for(Queue::iterator i = queue.begin(); i != queue.end(); ++i)
		if (i->id == id)
			{ i = queue.erase(i); return 1; }
	if (wait)
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
			while(i->taskId == id)
				i->condition.wait(lock);
	return 0;
}


int ThreadPool::cancelByOwner(const TaskOwner::Handle &owner, bool wait) {
	std::unique_lock<std::mutex> lock(mutex);
	int count = 0;
	for(Queue::iterator i = queue.begin(); i != queue.end(); ++i)
		if (TaskOwner::Handle(i->owner) == owner)
			{ i = queue.erase(i); ++count; }
	if (wait)
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
			while(i->taskOwner == owner)
				i->condition.wait(lock);
	return count;
}


int ThreadPool::cancelAll(bool wait) {
	std::unique_lock<std::mutex> lock(mutex);
	int count = queue.size();
	queue.clear();
	if (wait)
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
			while(i->taskId)
				i->condition.wait(lock);
	return count;
}


void ThreadPool::start(int count, bool cancelAllTasks) {
	std::lock_guard<std::mutex> lockStartStop(mutexStartStop);
	
	// stopping
	{  // set stop marks
		std::lock_guard<std::mutex> lock(mutex);
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
			i->stopping = true;
	} 
	for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) {
		i->thread->join();
		delete i->thread;
	}
	threads.clear();
	
	if (cancelAllTasks)
		queue.clear();
	if (count <= 0)
		return;
	
	{ // starting
		std::lock_guard<std::mutex> lock(mutex);
		threads.resize(count);
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
			i->thread = new std::thread(&ThreadPool::threadRun, this, std::ref(*i));
	}
}