#include "threadpool.h"
ThreadPool::ThreadPool():
lastId() { }
ThreadPool::~ThreadPool()
{ stop(true); }
void ThreadPool::threadRun(Thread &thread) {
Task task(nullptr);
while(true) {
{ // wait for new task
std::unique_lock<std::mutex> lock(mutex);
thread.taskId = 0;
thread.taskOwner.reset();
thread.condition.notify_all();
while(queue.empty() && !thread.stopping)
condition.wait(lock);
if (thread.stopping)
break;
TaskDesc &desc = queue.front();
thread.taskId = desc.id;
thread.taskOwner = desc.owner;
task = thread.taskOwner ? desc.task : nullptr;
queue.pop_front();
}
if (task)
task();
}
}
ThreadPool::TaskId ThreadPool::enqueue(const Task &task, const TaskOwner::Handle &owner) {
std::lock_guard<std::mutex> lock(mutex);
queue.emplace_back(++lastId, TaskOwner::Handle::Weak(owner), task);
condition.notify_one();
return lastId;
}
int ThreadPool::cancelById(TaskId id, bool wait) {
std::unique_lock<std::mutex> lock(mutex);
for(Queue::iterator i = queue.begin(); i != queue.end(); ++i)
if (i->id == id)
{ i = queue.erase(i); return 1; }
if (wait)
for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
while(i->taskId == id)
i->condition.wait(lock);
return 0;
}
int ThreadPool::cancelByOwner(const TaskOwner::Handle &owner, bool wait) {
std::unique_lock<std::mutex> lock(mutex);
int count = 0;
for(Queue::iterator i = queue.begin(); i != queue.end(); ++i)
if (TaskOwner::Handle(i->owner) == owner)
{ i = queue.erase(i); ++count; }
if (wait)
for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
while(i->taskOwner == owner)
i->condition.wait(lock);
return count;
}
int ThreadPool::cancelAll(bool wait) {
std::unique_lock<std::mutex> lock(mutex);
int count = queue.size();
queue.clear();
if (wait)
for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
while(i->taskId)
i->condition.wait(lock);
return count;
}
void ThreadPool::start(int count, bool cancelAllTasks) {
std::lock_guard<std::mutex> lockStartStop(mutexStartStop);
// stopping
{ // set stop marks
std::lock_guard<std::mutex> lock(mutex);
for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
i->stopping = true;
}
for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) {
i->thread->join();
delete i->thread;
}
threads.clear();
if (cancelAllTasks)
queue.clear();
if (count <= 0)
return;
{ // starting
std::lock_guard<std::mutex> lock(mutex);
threads.resize(count);
for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i)
i->thread = new std::thread(&ThreadPool::threadRun, this, std::ref(*i));
}
}