35#include "PTL/Config.hh"
43#if defined(PTL_USE_TBB)
44# if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
45# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
47# if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
48# define TBB_PREVIEW_GLOBAL_CONTROL 1
50# include <tbb/global_control.h>
51# include <tbb/task_arena.h>
52# include <tbb/task_group.h>
69#include <unordered_map>
79static const short STARTED = 0;
80static const short PARTIAL = 1;
81static const short STOPPED = 2;
82static const short NONINIT = 3;
90 template <
typename KeyT,
typename MappedT,
typename HashT = KeyT>
91 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
120 static std::atomic<intmax_t> assigned;
121 intmax_t _assign = assigned++;
122 return _assign % Thread::hardware_concurrency();
162 bool _use_affinity = f_use_cpu_affinity(),
167 bool _use_affinity = f_use_cpu_affinity(),
182 template <
typename FuncT>
185 template <
typename FuncT>
227 template <
typename ListT>
241 m_init_func = []() {};
245 m_fini_func = []() {};
270 bool is_main()
const {
return ThisThread::get_id() == m_main_tid; }
296 bool m_use_affinity =
false;
297 bool m_tbb_tp =
false;
298 bool m_delete_task_queue =
false;
299 int m_verbose = f_verbose();
300 int m_priority = f_thread_priority();
302 ThreadId m_main_tid = ThisThread::get_id();
305 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(0);
306 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(0);
309 lock_t m_task_lock = std::make_shared<Mutex>();
311 condition_t m_task_cond = std::make_shared<Condition>();
332 static bool& f_use_tbb();
333 static bool& f_use_cpu_affinity();
334 static int& f_thread_priority();
335 static int& f_verbose();
345 if(m_thread_awake->load() < m_pool_size)
348 m_task_cond->notify_one();
357 m_task_cond->notify_all();
367 if(m_thread_awake->load() < m_pool_size)
370 if(ntasks < this->
size())
373 m_task_cond->notify_one();
377 m_task_cond->notify_all();
394#if defined(PTL_USE_TBB)
396 if(!m_tbb_task_arena)
406 if(!m_tbb_task_arena)
409 return m_tbb_task_arena;
417 m_task_queue->
resize(
static_cast<intmax_t
>(_n));
423 auto&& _func = [_task]() { (*_task)(); };
425 if(m_tbb_tp && m_tbb_task_group)
428 _arena->execute([
this, _func]() { this->m_tbb_task_group->
run(_func); });
453 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
459template <
typename ListT>
472 auto c_size = c.size();
475 if(!itr->is_native_task())
491template <
typename FuncT>
495 if(m_tbb_tp && m_tbb_task_group)
497#if defined(PTL_USE_TBB)
502 std::set<std::thread::id> _first{};
508 if(_first.find(std::this_thread::get_id()) == _first.end())
513 _first.insert(std::this_thread::get_id());
525 std::atomic<size_t> _total_init{ 0 };
536 size_t _dmax = std::max<size_t>(_ncore, 8);
538 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
540 std::function<void()> _init_task;
543 static thread_local size_type _depth = 0;
556 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
559 tg.
run([&]() { _init_task(); });
560 tg.run([&]() { _init_task(); });
561 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
569 auto _fname = __FUNCTION__;
570 auto _write_info = [&]() {
571 std::cout <<
"[" << _fname <<
"]> Total initialized: " << _total_init
572 <<
", expected: " << _num <<
", max-parallel: " << _maxp
573 <<
", size: " << _sz <<
", ncore: " << _ncore << std::endl;
575 while(_total_init < _num)
581 [&]() { m_tbb_task_group->
run([&]() { _init_task(); }); });
583 _arena->execute([&]() { m_tbb_task_group->
wait(); });
585 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
591 if(nitr > 4 * (_ncore + 1))
609template <
typename FuncT>
614 if(m_tbb_tp && m_tbb_task_group)
616#if defined(PTL_USE_TBB)
621 std::set<std::thread::id> _first{};
627 if(_first.find(std::this_thread::get_id()) == _first.end())
632 _first.insert(std::this_thread::get_id());
644 std::atomic<size_t> _total_exec{ 0 };
648 size_t _dmax = std::max<size_t>(_ncore, 8);
650 size_t _num = _tids.size();
654 std::function<void()> _exec_task;
657 static thread_local size_type _depth = 0;
659 auto _this_tid = std::this_thread::get_id();
661 if(_tids.count(_this_tid) > 0)
671 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
674 tg.
run([&]() { _exec_task(); });
675 tg.run([&]() { _exec_task(); });
676 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
684 auto _fname = __FUNCTION__;
685 auto _write_info = [&]() {
686 std::cout <<
"[" << _fname <<
"]> Total executed: " << _total_exec
687 <<
", expected: " << _num <<
", size: " <<
size() << std::endl;
689 while(_total_exec < _num)
695 [&]() { m_tbb_task_group->
run([&]() { _exec_task(); }); });
697 _arena->execute([&]() { m_tbb_task_group->
wait(); });
699 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
705 if(nitr > 8 * (_num + 1))
#define PTL_DEFAULT_OBJECT(NAME)
static ThreadData *& GetInstance()
std::function< intmax_t(intmax_t)> affinity_func_t
ThreadPool(const Config &)
static bool get_default_use_cpu_affinity()
get the default use of cpu affinity
std::shared_ptr< Condition > condition_t
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
std::shared_ptr< std::atomic_uintmax_t > task_count_type
ThreadPool(ThreadPool &&)=default
std::map< uintmax_t, ThreadId > thread_index_map_t
static uintmax_t get_this_thread_id()
int insert(task_pointer &&, int=-1)
std::unordered_map< KeyT, MappedT, std::hash< HashT > > uomap
const pool_state_type & state() const
size_type add_task(task_pointer &&task, int bin=-1)
ThreadPool(const ThreadPool &)=delete
void set_priority(int _prio, Thread &) const
int run_on_this(task_pointer &&)
std::map< ThreadId, uintmax_t > thread_id_map_t
void set_finalization(finalize_func_t f)
static initialize_func_t & initialization_functor()
task_queue_t *& get_valid_queue(task_queue_t *&) const
std::vector< Thread > thread_vec_t
size_type add_tasks(ListT &)
static size_type get_default_size()
get the default pool size
void execute_on_specific_threads(const std::set< std::thread::id > &_tid, FuncT &&_func)
void resize(size_type _n)
std::shared_ptr< Mutex > lock_t
static uintmax_t get_thread_id(ThreadId)
bool is_tbb_threadpool() const
std::shared_ptr< task_type > task_pointer
void set_affinity(affinity_func_t f)
ThreadPool & operator=(const ThreadPool &)=delete
static bool get_default_use_tbb()
get the default use of tbb
std::shared_ptr< std::atomic_uintmax_t > atomic_int_type
static affinity_func_t & affinity_functor()
int get_active_threads_count() const
Thread * get_thread(std::thread::id id) const
std::shared_ptr< std::atomic_short > pool_state_type
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
static tbb_global_control_t *& tbb_global_control()
ThreadPool & operator=(ThreadPool &&)=default
static void set_default_size(size_type _v)
set the default pool size
task_queue_t * get_queue() const
Thread * get_thread(size_type _n) const
static finalize_func_t & finalization_functor()
std::shared_ptr< std::atomic_bool > atomic_bool_type
tbb_task_arena_t * get_task_arena()
bool using_affinity() const
static void set_default_verbose(int _v)
set the default verbosity
static void set_default_use_tbb(bool _v)
set the default use of tbb
void execute_on_all_threads(FuncT &&_func)
std::function< void()> finalize_func_t
static int get_default_verbose()
get the default verbosity
static const thread_id_map_t & get_thread_ids()
static int get_default_scheduling_priority()
get the default scheduling priority of threads in thread-pool
static void set_default_use_cpu_affinity(bool _v)
set the default use of cpu affinity
std::vector< std::shared_ptr< ThreadData > > thread_data_t
std::deque< ThreadId > thread_list_t
size_type destroy_threadpool()
size_type initialize_threadpool(size_type)
static void set_default_scheduling_priority(int _v)
set the default scheduling priority of threads in thread-pool
void execute_thread(VUserTaskQueue *)
std::function< void()> initialize_func_t
void reset_finalization()
bool is_initialized() const
std::vector< bool > bool_list_t
void set_initialization(initialize_func_t f)
void reset_initialization()
static void set_use_tbb(bool _v)
VUserTaskQueue task_queue_t
VTask is the abstract class stored in thread_pool.
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) PTL_NO_SANITIZE_THREAD=0
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
virtual void resize(intmax_t)=0
@ max_allowed_parallelism
static size_t active_value(parameter param)
void initialize(int max_concurrency=automatic, unsigned reserved_for_masters=1)
unsigned GetNumberOfCores()
tbb::task_arena tbb_task_arena_t
tbb::task_group tbb_task_group_t
VUserTaskQueue * task_queue
affinity_func_t set_affinity
Config(bool, bool, bool, int, int, size_type, VUserTaskQueue *, affinity_func_t, initialize_func_t, finalize_func_t)
initialize_func_t initializer
finalize_func_t finalizer