Blame threadpool.cpp

d0530b
d0530b
#include "threadpool.h"
d0530b
d0530b
d0530b
ThreadPool::ThreadPool():
d0530b
	lastId() { }
d0530b
d0530b
d0530b
ThreadPool::~ThreadPool()
d0530b
	{ stop(true); }
d0530b
d0530b
d0530b
void ThreadPool::threadRun(Thread &thread) {
d0530b
	Task task(nullptr);
d0530b
	while(true) {
d0530b
		{ // wait for new task
d0530b
			std::unique_lock<std::mutex> lock(mutex);</std::mutex>
d0530b
			
d0530b
			thread.taskId = 0;
98bb38
			thread.taskOwner.reset();
d0530b
			thread.condition.notify_all();
d0530b
			
d0530b
			while(queue.empty() && !thread.stopping)
d0530b
				condition.wait(lock);
d0530b
			if (thread.stopping)
d0530b
				break;
d0530b
			
d0530b
			TaskDesc &desc = queue.front();
d0530b
			thread.taskId = desc.id;
d0530b
			thread.taskOwner = desc.owner;
98bb38
			task = thread.taskOwner ? desc.task : nullptr;
d0530b
			
d0530b
			queue.pop_front();
d0530b
		}
d0530b
		
d0530b
		if (task)
d0530b
			task();
d0530b
	}
d0530b
}
d0530b
d0530b
98bb38
ThreadPool::TaskId ThreadPool::enqueue(const Task &task, const TaskOwner::Handle &owner) {
d0530b
	std::lock_guard<std::mutex> lock(mutex);</std::mutex>
98bb38
	queue.emplace_back(++lastId, TaskOwner::Handle::Weak(owner), task);
d0530b
	condition.notify_one();
d0530b
	return lastId;
d0530b
}
d0530b
d0530b
541903
int ThreadPool::cancelById(TaskId id, bool wait) {
d0530b
	std::unique_lock<std::mutex> lock(mutex);</std::mutex>
d0530b
	for(Queue::iterator i = queue.begin(); i != queue.end(); ++i)
d0530b
		if (i->id == id)
d0530b
			{ i = queue.erase(i); return 1; }
541903
	if (wait)
541903
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
541903
			while(i->taskId == id)
541903
				i->condition.wait(lock);
d0530b
	return 0;
d0530b
}
d0530b
d0530b
98bb38
int ThreadPool::cancelByOwner(const TaskOwner::Handle &owner, bool wait) {
d0530b
	std::unique_lock<std::mutex> lock(mutex);</std::mutex>
d0530b
	int count = 0;
d0530b
	for(Queue::iterator i = queue.begin(); i != queue.end(); ++i)
98bb38
		if (TaskOwner::Handle(i->owner) == owner)
d0530b
			{ i = queue.erase(i); ++count; }
541903
	if (wait)
541903
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
541903
			while(i->taskOwner == owner)
541903
				i->condition.wait(lock);
d0530b
	return count;
d0530b
}
d0530b
d0530b
541903
int ThreadPool::cancelAll(bool wait) {
d0530b
	std::unique_lock<std::mutex> lock(mutex);</std::mutex>
d0530b
	int count = queue.size();
d0530b
	queue.clear();
541903
	if (wait)
541903
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
541903
			while(i->taskId)
541903
				i->condition.wait(lock);
d0530b
	return count;
d0530b
}
d0530b
d0530b
d0530b
void ThreadPool::start(int count, bool cancelAllTasks) {
d0530b
	std::lock_guard<std::mutex> lockStartStop(mutexStartStop);</std::mutex>
d0530b
	
d0530b
	// stopping
d0530b
	{  // set stop marks
d0530b
		std::lock_guard<std::mutex> lock(mutex);</std::mutex>
d0530b
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
d0530b
			i->stopping = true;
d0530b
	} 
d0530b
	for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) {
d0530b
		i->thread->join();
d0530b
		delete i->thread;
d0530b
	}
d0530b
	threads.clear();
d0530b
	
d0530b
	if (cancelAllTasks)
d0530b
		queue.clear();
d0530b
	if (count <= 0)
d0530b
		return;
d0530b
	
d0530b
	{ // starting
d0530b
		std::lock_guard<std::mutex> lock(mutex);</std::mutex>
d0530b
		threads.resize(count);
d0530b
		for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
d0530b
			i->thread = new std::thread(&ThreadPool::threadRun, this, std::ref(*i));
d0530b
	}
d0530b
}
d0530b