/* === S Y N F I G ========================================================= */
/*! \file threadpool.cpp
** \brief ThreadPool File
**
** $Id$
**
** \legal
** ......... ... 2018 Ivan Mahonin
**
** This package is free software; you can redistribute it and/or
** modify it under the terms of the GNU General Public License as
** published by the Free Software Foundation; either version 2 of
** the License, or (at your option) any later version.
**
** This package is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
** General Public License for more details.
** \endlegal
*/
/* ========================================================================= */
/* === H E A D E R S ======================================================= */
#ifdef USING_PCH
# include "pch.h"
#else
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
//#define DEBUG_PTHREAD_MEASURE
#ifdef DEBUG_PTHREAD_MEASURE
#include <pthread.h>
#include <time.h>
#endif
#include <cassert>
#include <synfig/localization.h>
#include <synfig/general.h>
#include "threadpool.h"
#endif
/* === U S I N G =========================================================== */
using namespace std;
using namespace synfig;
/* === M A C R O S ========================================================= */
/* === G L O B A L S ======================================================= */
/* === M E T H O D S ======================================================= */
ThreadPool* ThreadPool::instance_ = 0;
// ThreadPool::Group
ThreadPool::Group::Group():
multithreading(), running_threads(0), sum_weight() { }
ThreadPool::Group::~Group()
{ run(); }
void
ThreadPool::Group::process(int begin, int end) {
for(int i = begin; i < end; ++i)
try { tasks[i].second(); } catch(...) { }
Glib::Threads::Mutex::Lock lock(mutex);
if (!--running_threads) cond.signal();
}
void
ThreadPool::Group::enqueue(const Slot &slot, Real weight) {
weight = std::max(real_precision<Real>(), std::min(1.0/real_precision<Real>(), weight));
tasks.push_back(Entry(weight, slot));
sum_weight += weight;
}
void
ThreadPool::Group::run(bool force_thread) {
// enqueue parallel tasks
Real sum = 0.0;
int begin = 0;
int end = 0;
for(int i = 0; i < (int)tasks.size(); ++i) {
end = i + 1;
sum += tasks[i].first;
if (sum_weight - sum < 0.75) break;
if (sum >= 0.75) {
multithreading = true;
++running_threads;
instance().enqueue( sigc::bind( sigc::mem_fun(this, &Group::process), begin, end ));
sum_weight -= sum;
sum = 0.0;
begin = end;
}
}
end = (int)tasks.size();
// run in current thread
if (begin < end) {
if (force_thread) {
multithreading = true;
++running_threads;
instance().enqueue( sigc::bind( sigc::mem_fun(this, &Group::process), begin, end ));
} else {
for(int i = begin; i < end; ++i)
tasks[i].second();
}
}
// wait
if (multithreading) {
Glib::Threads::Mutex::Lock lock(mutex);
while(running_threads > 0) instance().wait(cond, mutex);
}
// reset
multithreading = false;
tasks.clear();
sum_weight = 0.0;
}
// ThreadPool
ThreadPool::ThreadPool():
max_running_threads(0),
last_thread_id(0),
running_threads(0),
ready_threads(0),
queue_size(0),
stopped(false)
{
max_running_threads = g_get_num_processors();
if (const char *s = getenv("SYNFIG_GENERIC_THREADS"))
max_running_threads = atoi(s) + 1;
if (max_running_threads < 2) max_running_threads = 2;
if (max_running_threads > 2) --max_running_threads;
++running_threads;
#ifdef DEBUG_PTHREAD_MEASURE
info("ThreadPool created with max running threads: %d", max_running_threads - 1);
#endif
}
ThreadPool::~ThreadPool() {
#ifdef DEBUG_PTHREAD_MEASURE
info("ThreadPool destroying with tasks in queue: %d, and tasks in process: %d", (int)queue_size, (int)running_threads);
#endif
{
Glib::Threads::Mutex::Lock lock(mutex);
stopped = true;
cond.broadcast();
}
while(true) {
Glib::Threads::Thread *thread = 0;
{
Glib::Threads::Mutex::Lock lock(mutex);
if (threads.empty()) break;
stopped = true;
cond.broadcast();
thread = threads.back();
threads.pop_back();
}
thread->join();
}
{
#ifdef DEBUG_PTHREAD_MEASURE
Glib::Threads::Mutex::Lock lock(mutex);
info("ThreadPool destroyed with unprocessed tasks in queue: %d", queue.size());
#endif
}
}
void
ThreadPool::thread_loop(int
#ifdef DEBUG_PTHREAD_MEASURE
id
#endif
) {
++running_threads;
#ifdef DEBUG_PTHREAD_MEASURE
info("started new thread #%d in ThreadPool", id);
clockid_t clock_id;
pthread_getcpuclockid(pthread_self(), &clock_id);
#endif
while(true) {
Slot slot;
{
Glib::Threads::Mutex::Lock lock(mutex);
while(!stopped && (queue.empty() || running_threads > max_running_threads)) {
++ready_threads;
--running_threads;
cond.wait(mutex);
++running_threads;
--ready_threads;
}
if (stopped) break;
slot = queue.front();
queue.pop();
--queue_size;
}
#ifdef DEBUG_PTHREAD_MEASURE
struct timespec spec;
clock_gettime(clock_id, &spec);
long long time0 = spec.tv_sec*1000000000ll + spec.tv_nsec;
long long rtime0 = g_get_monotonic_time();
info( "ThreadPool thread #%d: begin, running threads: %d, ready threads: %d, queue size: %d",
id,
(int)running_threads,
(int)ready_threads,
(int)queue_size );
#endif
slot();
#ifdef DEBUG_PTHREAD_MEASURE
clock_gettime(clock_id, &spec);
long long time1 = spec.tv_sec*1000000000ll + spec.tv_nsec;
long long rtime1 = g_get_monotonic_time();
info( "ThreadPool thread #%d: processed task for %.6f (real %.6f)",
id,
(double)(time1 - time0)*1e-9,
(double)(rtime1 - rtime0)*1e-6 );
#endif
}
#ifdef DEBUG_PTHREAD_MEASURE
info("thread #%d in ThreadPool stopped", id);
#endif
--running_threads;
}
void
ThreadPool::wakeup() {
int to_wakeup = std::max(0, std::min((int)queue_size, max_running_threads - (int)running_threads));
int to_create = std::max(0, to_wakeup - (int)ready_threads);
to_wakeup = std::max(0, to_wakeup - to_create);
while(to_create-- > 0)
threads.push_back(
Glib::Threads::Thread::create(
sigc::bind( sigc::mem_fun(this, &ThreadPool::thread_loop), ++last_thread_id )));
while(to_wakeup-- > 0)
cond.signal();
}
void
ThreadPool::enqueue(const Slot &slot) {
Glib::Threads::Mutex::Lock lock(mutex);
++queue_size;
queue.push(slot);
wakeup();
}
void
ThreadPool::wait(Glib::Threads::Cond &cond, Glib::Threads::Mutex &mutex) {
if (--running_threads < max_running_threads)
if (queue_size) // wakeup or create ready thread if we have tasks in queue
{ Glib::Threads::Mutex::Lock lock(this->mutex); wakeup(); }
cond.wait(mutex);
++running_threads;
}
ThreadPool&
ThreadPool::instance() {
assert(instance_);
return *instance_;
}
bool
ThreadPool::subsys_init() {
assert(!instance_);
if (!instance_) instance_ = new ThreadPool();
return true;
}
bool
ThreadPool::subsys_stop() {
assert(instance_);
if (instance_) delete instance_;
instance_ = 0;
return true;
}