diff --git a/threadpool.cpp b/threadpool.cpp new file mode 100644 index 0000000..2584d3c --- /dev/null +++ b/threadpool.cpp @@ -0,0 +1,113 @@ + +#include "threadpool.h" + + +ThreadPool::ThreadPool(): + lastId() { } + + +ThreadPool::~ThreadPool() + { stop(true); } + + +void ThreadPool::threadRun(Thread &thread) { + Task task(nullptr); + while(true) { + { // wait for new task + std::unique_lock lock(mutex); + + thread.taskId = 0; + thread.taskOwner = nullptr; + thread.condition.notify_all(); + + while(queue.empty() && !thread.stopping) + condition.wait(lock); + if (thread.stopping) + break; + + TaskDesc &desc = queue.front(); + thread.taskId = desc.id; + thread.taskOwner = desc.owner; + task = desc.task; + + queue.pop_front(); + } + + if (task) + task(); + } +} + + +ThreadPool::TaskId ThreadPool::enqueue(const Task &task, const void *owner) { + std::lock_guard lock(mutex); + queue.emplace_back(++lastId, owner, task); + condition.notify_one(); + return lastId; +} + + +int ThreadPool::cancelById(TaskId id) { + std::unique_lock lock(mutex); + for(Queue::iterator i = queue.begin(); i != queue.end(); ++i) + if (i->id == id) + { i = queue.erase(i); return 1; } + for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) + while(i->taskId == id) + i->condition.wait(lock); + return 0; +} + + +int ThreadPool::cancelByOwner(const void *owner) { + std::unique_lock lock(mutex); + int count = 0; + for(Queue::iterator i = queue.begin(); i != queue.end(); ++i) + if (i->owner == owner) + { i = queue.erase(i); ++count; } + for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) + while(i->taskOwner == owner) + i->condition.wait(lock); + return count; +} + + +int ThreadPool::cancelAll() { + std::unique_lock lock(mutex); + int count = queue.size(); + queue.clear(); + for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) + while(i->taskId) + i->condition.wait(lock); + return count; +} + + +void ThreadPool::start(int count, bool cancelAllTasks) { + std::lock_guard lockStartStop(mutexStartStop); + + // stopping + { // set stop marks + std::lock_guard lock(mutex); + for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) + i->stopping = true; + } + for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) { + i->thread->join(); + delete i->thread; + } + threads.clear(); + + if (cancelAllTasks) + queue.clear(); + if (count <= 0) + return; + + { // starting + std::lock_guard lock(mutex); + threads.resize(count); + for(ThreadList::iterator i = threads.begin(); i != threads.end(); ++i) + i->thread = new std::thread(&ThreadPool::threadRun, this, std::ref(*i)); + } +} + diff --git a/threadpool.h b/threadpool.h new file mode 100644 index 0000000..62883f4 --- /dev/null +++ b/threadpool.h @@ -0,0 +1,68 @@ +#ifndef THREADPOOL_H +#define THREADPOOL_H + + +#include +#include +#include +#include +#include +#include + + +class ThreadPool { +public: + typedef unsigned long long TaskId; + typedef std::function Task; + + struct TaskDesc { + TaskId id; + const void* owner; + Task task; + + inline TaskDesc(): + id(), owner(), task(nullptr) { } + inline TaskDesc(TaskId id, const void* owner, const Task &task): + id(id), owner(owner), task(task) { } + }; + + struct Thread { + std::thread *thread; + std::condition_variable condition; + TaskId taskId; + const void* taskOwner; + bool stopping; + + inline Thread(): + thread(), taskId(), taskOwner(), stopping() { } + }; + + typedef std::list ThreadList; + typedef std::deque Queue; + +private: + std::mutex mutexStartStop; + std::mutex mutex; + std::condition_variable condition; + ThreadList threads; + Queue queue; + TaskId lastId; + + void threadRun(Thread &thread); + +public: + ThreadPool(); + ~ThreadPool(); + + TaskId enqueue(const Task &task, const void *owner = nullptr); + int cancelById(TaskId id); + int cancelByOwner(const void *owner); + int cancelAll(); + + void start(int count, bool cancelAllTasks = false); + inline void stop(bool cancelAllTasks = false) + { start(0, cancelAllTasks); } +}; + + +#endif