Geant4 11.1.1
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
ThreadPool.hh
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class header file
21//
22// Class Description:
23//
24// This file creates a class for an efficient thread-pool that
25// accepts work in the form of tasks.
26//
27// ---------------------------------------------------------------
28// Author: Jonathan Madsen (Feb 13th 2018)
29// ---------------------------------------------------------------
30
31#pragma once
32
33#include "PTL/AutoLock.hh"
34#ifndef G4GMAKE
35#include "PTL/Config.hh"
36#endif
37#include "PTL/ThreadData.hh"
38#include "PTL/Threading.hh"
39#include "PTL/Types.hh"
40#include "PTL/VTask.hh"
41#include "PTL/VUserTaskQueue.hh"
42
43#if defined(PTL_USE_TBB)
44# if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
45# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
46# endif
47# if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
48# define TBB_PREVIEW_GLOBAL_CONTROL 1
49# endif
50# include <tbb/global_control.h>
51# include <tbb/task_arena.h>
52# include <tbb/task_group.h>
53#endif
54
55#include <algorithm>
56#include <atomic>
57#include <chrono>
58#include <cstdint>
59#include <cstdlib>
60#include <deque>
61#include <functional>
62#include <iostream>
63#include <map>
64#include <memory>
65#include <mutex> // IWYU pragma: keep
66#include <set>
67#include <thread>
68#include <type_traits> // IWYU pragma: keep
69#include <unordered_map>
70#include <utility>
71#include <vector>
72
73namespace PTL
74{
75namespace thread_pool
76{
77namespace state
78{
79static const short STARTED = 0;
80static const short PARTIAL = 1;
81static const short STOPPED = 2;
82static const short NONINIT = 3;
83
84} // namespace state
85} // namespace thread_pool
86
88{
89public:
90 template <typename KeyT, typename MappedT, typename HashT = KeyT>
91 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
92
93 // pod-types
94 using size_type = size_t;
95 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>;
96 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>;
97 using pool_state_type = std::shared_ptr<std::atomic_short>;
98 using atomic_bool_type = std::shared_ptr<std::atomic_bool>;
99 // objects
101 using lock_t = std::shared_ptr<Mutex>;
102 using condition_t = std::shared_ptr<Condition>;
103 using task_pointer = std::shared_ptr<task_type>;
105 // containers
106 using thread_list_t = std::deque<ThreadId>;
107 using bool_list_t = std::vector<bool>;
108 using thread_id_map_t = std::map<ThreadId, uintmax_t>;
109 using thread_index_map_t = std::map<uintmax_t, ThreadId>;
110 using thread_vec_t = std::vector<Thread>;
111 using thread_data_t = std::vector<std::shared_ptr<ThreadData>>;
112 // functions
113 using initialize_func_t = std::function<void()>;
114 using finalize_func_t = std::function<void()>;
115 using affinity_func_t = std::function<intmax_t(intmax_t)>;
116
118 {
119 static affinity_func_t _v = [](intmax_t) {
120 static std::atomic<intmax_t> assigned;
121 intmax_t _assign = assigned++;
122 return _assign % Thread::hardware_concurrency();
123 };
124 return _v;
125 }
126
128 {
129 static initialize_func_t _v = []() {};
130 return _v;
131 }
132
134 {
135 static finalize_func_t _v = []() {};
136 return _v;
137 }
138
139 struct Config
140 {
142
143 Config(bool, bool, bool, int, int, size_type, VUserTaskQueue*, affinity_func_t,
145
146 bool init = true;
147 bool use_tbb = f_use_tbb();
148 bool use_affinity = f_use_cpu_affinity();
149 int verbose = f_verbose();
150 int priority = f_thread_priority();
151 size_type pool_size = f_default_pool_size();
156 };
157
158public:
159 // Constructor and Destructors
160 explicit ThreadPool(const Config&);
161 ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue = nullptr,
162 bool _use_affinity = f_use_cpu_affinity(),
167 bool _use_affinity = f_use_cpu_affinity(),
169 VUserTaskQueue* task_queue = nullptr);
170 virtual ~ThreadPool();
171 ThreadPool(const ThreadPool&) = delete;
172 ThreadPool(ThreadPool&&) = default;
173 ThreadPool& operator=(const ThreadPool&) = delete;
175
176public:
177 // Public functions
178 size_type initialize_threadpool(size_type); // start the threads
179 size_type destroy_threadpool(); // destroy the threads
181
182 template <typename FuncT>
183 void execute_on_all_threads(FuncT&& _func);
184
185 template <typename FuncT>
186 void execute_on_specific_threads(const std::set<std::thread::id>& _tid,
187 FuncT&& _func);
188
189 task_queue_t* get_queue() const { return m_task_queue; }
191
192 bool is_tbb_threadpool() const { return m_tbb_tp; }
193
194public:
195 // Public functions related to TBB
196 static bool using_tbb();
197 // enable using TBB if available - semi-deprecated
198 static void set_use_tbb(bool _v);
199
200 /// set the default use of tbb
201 static void set_default_use_tbb(bool _v) { set_use_tbb(_v); }
202 /// set the default use of cpu affinity
203 static void set_default_use_cpu_affinity(bool _v);
204 /// set the default scheduling priority of threads in thread-pool
205 static void set_default_scheduling_priority(int _v) { f_thread_priority() = _v; }
206 /// set the default verbosity
207 static void set_default_verbose(int _v) { f_verbose() = _v; }
208 /// set the default pool size
209 static void set_default_size(size_type _v) { f_default_pool_size() = _v; }
210
211 /// get the default use of tbb
212 static bool get_default_use_tbb() { return f_use_tbb(); }
213 /// get the default use of cpu affinity
214 static bool get_default_use_cpu_affinity() { return f_use_cpu_affinity(); }
215 /// get the default scheduling priority of threads in thread-pool
216 static int get_default_scheduling_priority() { return f_thread_priority(); }
217 /// get the default verbosity
218 static int get_default_verbose() { return f_verbose(); }
219 /// get the default pool size
220 static size_type get_default_size() { return f_default_pool_size(); }
221
222public:
223 // add tasks for threads to process
224 size_type add_task(task_pointer&& task, int bin = -1);
225 // size_type add_thread_task(ThreadId id, task_pointer&& task);
226 // add a generic container with iterator
227 template <typename ListT>
228 size_type add_tasks(ListT&);
229
231 Thread* get_thread(std::thread::id id) const;
232
233 // only relevant when compiled with PTL_USE_TBB
235
236 void set_initialization(initialize_func_t f) { m_init_func = std::move(f); }
237 void set_finalization(finalize_func_t f) { m_fini_func = std::move(f); }
238
240 {
241 m_init_func = []() {};
242 }
244 {
245 m_fini_func = []() {};
246 }
247
248public:
249 // get the pool state
250 const pool_state_type& state() const { return m_pool_state; }
251 // see how many main task threads there are
252 size_type size() const { return m_pool_size; }
253 // set the thread pool size
254 void resize(size_type _n);
255 // affinity assigns threads to cores, assignment at constructor
256 bool using_affinity() const { return m_use_affinity; }
257 bool is_alive() { return m_alive_flag->load(); }
258 void notify();
259 void notify_all();
260 void notify(size_type);
261 bool is_initialized() const;
262 int get_active_threads_count() const { return (int)m_thread_awake->load(); }
263
264 void set_affinity(affinity_func_t f) { m_affinity_func = std::move(f); }
265 void set_affinity(intmax_t i, Thread&) const;
266 void set_priority(int _prio, Thread&) const;
267
268 void set_verbose(int n) { m_verbose = n; }
269 int get_verbose() const { return m_verbose; }
270 bool is_main() const { return ThisThread::get_id() == m_main_tid; }
271
273
274public:
275 // read FORCE_NUM_THREADS environment variable
276 static const thread_id_map_t& get_thread_ids();
277 static uintmax_t get_thread_id(ThreadId);
278 static uintmax_t get_this_thread_id();
279 static uintmax_t add_thread_id(ThreadId = ThisThread::get_id());
280
281protected:
282 void execute_thread(VUserTaskQueue*); // function thread sits in
283 int insert(task_pointer&&, int = -1);
285
286protected:
287 // called in THREAD INIT
288 static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1);
289
290 void record_entry();
291 void record_exit();
292
293private:
294 // Private variables
295 // random
296 bool m_use_affinity = false;
297 bool m_tbb_tp = false;
298 bool m_delete_task_queue = false;
299 int m_verbose = f_verbose();
300 int m_priority = f_thread_priority();
301 size_type m_pool_size = 0;
302 ThreadId m_main_tid = ThisThread::get_id();
303 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false);
304 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0);
305 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(0);
306 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(0);
307
308 // locks
309 lock_t m_task_lock = std::make_shared<Mutex>();
310 // conditions
311 condition_t m_task_cond = std::make_shared<Condition>();
312
313 // containers
314 bool_list_t m_is_joined = {}; // join list
315 bool_list_t m_is_stopped = {}; // lets thread know to stop
316 thread_list_t m_main_threads = {}; // storage for active threads
317 thread_list_t m_stop_threads = {}; // storage for stopped threads
318 thread_vec_t m_threads = {};
319 thread_data_t m_thread_data = {};
320
321 // task queue
322 task_queue_t* m_task_queue = nullptr;
323 tbb_task_arena_t* m_tbb_task_arena = nullptr;
324 tbb_task_group_t* m_tbb_task_group = nullptr;
325
326 // functions
329 affinity_func_t m_affinity_func = affinity_functor();
330
331private:
332 static bool& f_use_tbb();
333 static bool& f_use_cpu_affinity();
334 static int& f_thread_priority();
335 static int& f_verbose();
336 static size_type& f_default_pool_size();
337 static thread_id_map_t& f_thread_ids();
338};
339
340//--------------------------------------------------------------------------------------//
341inline void
343{
344 // wake up one thread that is waiting for a task to be available
345 if(m_thread_awake->load() < m_pool_size)
346 {
347 AutoLock l(*m_task_lock);
348 m_task_cond->notify_one();
349 }
350}
351//--------------------------------------------------------------------------------------//
352inline void
354{
355 // wake all threads
356 AutoLock l(*m_task_lock);
357 m_task_cond->notify_all();
358}
359//--------------------------------------------------------------------------------------//
360inline void
362{
363 if(ntasks == 0)
364 return;
365
366 // wake up as many threads that tasks just added
367 if(m_thread_awake->load() < m_pool_size)
368 {
369 AutoLock l(*m_task_lock);
370 if(ntasks < this->size())
371 {
372 for(size_type i = 0; i < ntasks; ++i)
373 m_task_cond->notify_one();
374 }
375 else
376 {
377 m_task_cond->notify_all();
378 }
379 }
380}
381//--------------------------------------------------------------------------------------//
382// local function for getting the tbb task scheduler
385{
386 static thread_local tbb_global_control_t* _instance = nullptr;
387 return _instance;
388}
389//--------------------------------------------------------------------------------------//
390// task arena
391inline tbb_task_arena_t*
393{
394#if defined(PTL_USE_TBB)
395 // create a task arena
396 if(!m_tbb_task_arena)
397 {
398 auto _sz = (tbb_global_control())
401 : size();
402 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
403 m_tbb_task_arena->initialize(_sz, 1);
404 }
405#else
406 if(!m_tbb_task_arena)
407 m_tbb_task_arena = new tbb_task_arena_t{};
408#endif
409 return m_tbb_task_arena;
410}
411//--------------------------------------------------------------------------------------//
412inline void
414{
416 if(m_task_queue)
417 m_task_queue->resize(static_cast<intmax_t>(_n));
418}
419//--------------------------------------------------------------------------------------//
420inline int
422{
423 auto&& _func = [_task]() { (*_task)(); };
424
425 if(m_tbb_tp && m_tbb_task_group)
426 {
427 auto* _arena = get_task_arena();
428 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
429 }
430 else
431 {
432 _func();
433 }
434 // return the number of tasks added to task-list
435 return 0;
436}
437//--------------------------------------------------------------------------------------//
438inline int
440{
441 static thread_local ThreadData* _data = ThreadData::GetInstance();
442
443 // pass the task to the queue
444 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin);
445 notify();
446 return (int)ibin;
447}
448//--------------------------------------------------------------------------------------//
451{
452 // if not native (i.e. TBB) or we haven't built thread-pool, just execute
453 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
454 return static_cast<size_type>(run_on_this(std::move(task)));
455
456 return static_cast<size_type>(insert(std::move(task), bin));
457}
458//--------------------------------------------------------------------------------------//
459template <typename ListT>
462{
463 if(!m_alive_flag) // if we haven't built thread-pool, just execute
464 {
465 for(auto& itr : c)
466 run(itr);
467 c.clear();
468 return 0;
469 }
470
471 // TODO: put a limit on how many tasks can be added at most
472 auto c_size = c.size();
473 for(auto& itr : c)
474 {
475 if(!itr->is_native_task())
476 --c_size;
477 else
478 {
479 //++(m_task_queue);
480 get_valid_queue(m_task_queue)->InsertTask(itr);
481 }
482 }
483 c.clear();
484
485 // notify sleeping threads
486 notify(c_size);
487
488 return c_size;
489}
490//--------------------------------------------------------------------------------------//
491template <typename FuncT>
492inline void
494{
495 if(m_tbb_tp && m_tbb_task_group)
496 {
497#if defined(PTL_USE_TBB)
498 // TBB lazily activates threads to process tasks and the main thread
499 // participates in processing the tasks so getting a specific
500 // function to execute only on the worker threads requires some trickery
501 //
502 std::set<std::thread::id> _first{};
503 Mutex _mutex{};
504 // init function which executes function and returns 1 only once
505 auto _init = [&]() {
506 int _once = 0;
507 _mutex.lock();
508 if(_first.find(std::this_thread::get_id()) == _first.end())
509 {
510 // we need to reset this thread-local static for multiple invocations
511 // of the same template instantiation
512 _once = 1;
513 _first.insert(std::this_thread::get_id());
514 }
515 _mutex.unlock();
516 if(_once != 0)
517 {
518 _func();
519 return 1;
520 }
521 return 0;
522 };
523 // this will collect the number of threads which have
524 // executed the _init function above
525 std::atomic<size_t> _total_init{ 0 };
526 // max parallelism by TBB
527 size_t _maxp = tbb_global_control()->active_value(
529 // create a task arean
530 auto* _arena = get_task_arena();
531 // size of the thread-pool
532 size_t _sz = size();
533 // number of cores
534 size_t _ncore = Threading::GetNumberOfCores();
535 // maximum depth for recursion
536 size_t _dmax = std::max<size_t>(_ncore, 8);
537 // how many threads we need to initialize
538 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
539 // this is the task passed to the task-group
540 std::function<void()> _init_task;
541 _init_task = [&]() {
543 static thread_local size_type _depth = 0;
544 int _ret = 0;
545 // don't let the main thread execute the function
546 if(!is_main())
547 {
548 // execute the function
549 _ret = _init();
550 // add the result
551 _total_init += _ret;
552 }
553 // if the function did not return anything, recursively execute
554 // two more tasks
555 ++_depth;
556 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
557 {
558 tbb::task_group tg{};
559 tg.run([&]() { _init_task(); });
560 tg.run([&]() { _init_task(); });
561 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
562 tg.wait();
563 }
564 --_depth;
565 };
566
567 // TBB won't oversubscribe so we need to limit by ncores - 1
568 size_t nitr = 0;
569 auto _fname = __FUNCTION__;
570 auto _write_info = [&]() {
571 std::cout << "[" << _fname << "]> Total initialized: " << _total_init
572 << ", expected: " << _num << ", max-parallel: " << _maxp
573 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
574 };
575 while(_total_init < _num)
576 {
577 auto _n = 2 * _num;
578 while(--_n > 0)
579 {
580 _arena->execute(
581 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
582 }
583 _arena->execute([&]() { m_tbb_task_group->wait(); });
584 // don't loop infinitely but use a strict condition
585 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
586 {
587 _write_info();
588 break;
589 }
590 // at this point we need to exit
591 if(nitr > 4 * (_ncore + 1))
592 {
593 _write_info();
594 break;
595 }
596 }
597 if(get_verbose() > 3)
598 _write_info();
599#endif
600 }
601 else if(get_queue())
602 {
603 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
604 }
605}
606
607//--------------------------------------------------------------------------------------//
608
609template <typename FuncT>
610inline void
611ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids,
612 FuncT&& _func)
613{
614 if(m_tbb_tp && m_tbb_task_group)
615 {
616#if defined(PTL_USE_TBB)
617 // TBB lazily activates threads to process tasks and the main thread
618 // participates in processing the tasks so getting a specific
619 // function to execute only on the worker threads requires some trickery
620 //
621 std::set<std::thread::id> _first{};
622 Mutex _mutex{};
623 // init function which executes function and returns 1 only once
624 auto _exec = [&]() {
625 int _once = 0;
626 _mutex.lock();
627 if(_first.find(std::this_thread::get_id()) == _first.end())
628 {
629 // we need to reset this thread-local static for multiple invocations
630 // of the same template instantiation
631 _once = 1;
632 _first.insert(std::this_thread::get_id());
633 }
634 _mutex.unlock();
635 if(_once != 0)
636 {
637 _func();
638 return 1;
639 }
640 return 0;
641 };
642 // this will collect the number of threads which have
643 // executed the _exec function above
644 std::atomic<size_t> _total_exec{ 0 };
645 // number of cores
646 size_t _ncore = Threading::GetNumberOfCores();
647 // maximum depth for recursion
648 size_t _dmax = std::max<size_t>(_ncore, 8);
649 // how many threads we need to initialize
650 size_t _num = _tids.size();
651 // create a task arena
652 auto* _arena = get_task_arena();
653 // this is the task passed to the task-group
654 std::function<void()> _exec_task;
655 _exec_task = [&]() {
657 static thread_local size_type _depth = 0;
658 int _ret = 0;
659 auto _this_tid = std::this_thread::get_id();
660 // don't let the main thread execute the function
661 if(_tids.count(_this_tid) > 0)
662 {
663 // execute the function
664 _ret = _exec();
665 // add the result
666 _total_exec += _ret;
667 }
668 // if the function did not return anything, recursively execute
669 // two more tasks
670 ++_depth;
671 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
672 {
673 tbb::task_group tg{};
674 tg.run([&]() { _exec_task(); });
675 tg.run([&]() { _exec_task(); });
676 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
677 tg.wait();
678 }
679 --_depth;
680 };
681
682 // TBB won't oversubscribe so we need to limit by ncores - 1
683 size_t nitr = 0;
684 auto _fname = __FUNCTION__;
685 auto _write_info = [&]() {
686 std::cout << "[" << _fname << "]> Total executed: " << _total_exec
687 << ", expected: " << _num << ", size: " << size() << std::endl;
688 };
689 while(_total_exec < _num)
690 {
691 auto _n = 2 * _num;
692 while(--_n > 0)
693 {
694 _arena->execute(
695 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
696 }
697 _arena->execute([&]() { m_tbb_task_group->wait(); });
698 // don't loop infinitely but use a strict condition
699 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
700 {
701 _write_info();
702 break;
703 }
704 // at this point we need to exit
705 if(nitr > 8 * (_num + 1))
706 {
707 _write_info();
708 break;
709 }
710 }
711 if(get_verbose() > 3)
712 _write_info();
713#endif
714 }
715 else if(get_queue())
716 {
717 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
718 }
719}
720
721//======================================================================================//
722
723} // namespace PTL
#define PTL_DEFAULT_OBJECT(NAME)
Definition: Types.hh:86
static ThreadData *& GetInstance()
Definition: ThreadData.cc:32
std::vector< std::shared_ptr< ThreadData > > thread_data_t
Definition: ThreadPool.hh:111
std::function< intmax_t(intmax_t)> affinity_func_t
Definition: ThreadPool.hh:115
void set_verbose(int n)
Definition: ThreadPool.hh:268
static bool get_default_use_cpu_affinity()
get the default use of cpu affinity
Definition: ThreadPool.hh:214
std::shared_ptr< Condition > condition_t
Definition: ThreadPool.hh:102
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
Definition: ThreadPool.cc:116
std::shared_ptr< std::atomic_uintmax_t > task_count_type
Definition: ThreadPool.hh:95
bool is_main() const
Definition: ThreadPool.hh:270
ThreadPool(ThreadPool &&)=default
std::map< uintmax_t, ThreadId > thread_index_map_t
Definition: ThreadPool.hh:109
void record_entry()
Definition: ThreadPool.cc:339
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:215
int insert(task_pointer &&, int=-1)
Definition: ThreadPool.hh:439
const pool_state_type & state() const
Definition: ThreadPool.hh:250
size_type add_task(task_pointer &&task, int bin=-1)
Definition: ThreadPool.hh:450
ThreadPool(const ThreadPool &)=delete
void set_priority(int _prio, Thread &) const
Definition: ThreadPool.cc:378
int run_on_this(task_pointer &&)
Definition: ThreadPool.hh:421
std::map< ThreadId, uintmax_t > thread_id_map_t
Definition: ThreadPool.hh:108
void set_finalization(finalize_func_t f)
Definition: ThreadPool.hh:237
static bool using_tbb()
Definition: ThreadPool.cc:151
static initialize_func_t & initialization_functor()
Definition: ThreadPool.hh:127
task_queue_t *& get_valid_queue(task_queue_t *&) const
Definition: ThreadPool.cc:759
std::vector< Thread > thread_vec_t
Definition: ThreadPool.hh:110
size_type add_tasks(ListT &)
Definition: ThreadPool.hh:461
static size_type get_default_size()
get the default pool size
Definition: ThreadPool.hh:220
void execute_on_specific_threads(const std::set< std::thread::id > &_tid, FuncT &&_func)
Definition: ThreadPool.hh:611
size_t size_type
Definition: ThreadPool.hh:94
void resize(size_type _n)
Definition: ThreadPool.hh:413
virtual ~ThreadPool()
Definition: ThreadPool.cc:303
std::shared_ptr< Mutex > lock_t
Definition: ThreadPool.hh:101
static uintmax_t get_thread_id(ThreadId)
Definition: ThreadPool.cc:191
bool is_tbb_threadpool() const
Definition: ThreadPool.hh:192
std::shared_ptr< task_type > task_pointer
Definition: ThreadPool.hh:103
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:264
size_type stop_thread()
Definition: ThreadPool.cc:711
ThreadPool & operator=(const ThreadPool &)=delete
static bool get_default_use_tbb()
get the default use of tbb
Definition: ThreadPool.hh:212
std::shared_ptr< std::atomic_uintmax_t > atomic_int_type
Definition: ThreadPool.hh:96
static affinity_func_t & affinity_functor()
Definition: ThreadPool.hh:117
int get_active_threads_count() const
Definition: ThreadPool.hh:262
Thread * get_thread(std::thread::id id) const
std::shared_ptr< std::atomic_short > pool_state_type
Definition: ThreadPool.hh:97
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
Definition: ThreadPool.cc:223
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:384
ThreadPool & operator=(ThreadPool &&)=default
static void set_default_size(size_type _v)
set the default pool size
Definition: ThreadPool.hh:209
task_queue_t * get_queue() const
Definition: ThreadPool.hh:189
Thread * get_thread(size_type _n) const
static finalize_func_t & finalization_functor()
Definition: ThreadPool.hh:133
std::shared_ptr< std::atomic_bool > atomic_bool_type
Definition: ThreadPool.hh:98
int get_verbose() const
Definition: ThreadPool.hh:269
tbb_task_arena_t * get_task_arena()
Definition: ThreadPool.hh:392
bool using_affinity() const
Definition: ThreadPool.hh:256
static void set_default_verbose(int _v)
set the default verbosity
Definition: ThreadPool.hh:207
static void set_default_use_tbb(bool _v)
set the default use of tbb
Definition: ThreadPool.hh:201
void execute_on_all_threads(FuncT &&_func)
Definition: ThreadPool.hh:493
std::function< void()> finalize_func_t
Definition: ThreadPool.hh:114
static int get_default_verbose()
get the default verbosity
Definition: ThreadPool.hh:218
static const thread_id_map_t & get_thread_ids()
Definition: ThreadPool.cc:183
static int get_default_scheduling_priority()
get the default scheduling priority of threads in thread-pool
Definition: ThreadPool.hh:216
static void set_default_use_cpu_affinity(bool _v)
set the default use of cpu affinity
Definition: ThreadPool.cc:171
size_type size() const
Definition: ThreadPool.hh:252
std::deque< ThreadId > thread_list_t
Definition: ThreadPool.hh:106
size_type destroy_threadpool()
Definition: ThreadPool.cc:572
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:402
static void set_default_scheduling_priority(int _v)
set the default scheduling priority of threads in thread-pool
Definition: ThreadPool.hh:205
void execute_thread(VUserTaskQueue *)
Definition: ThreadPool.cc:768
std::function< void()> initialize_func_t
Definition: ThreadPool.hh:113
void reset_finalization()
Definition: ThreadPool.hh:243
bool is_initialized() const
Definition: ThreadPool.cc:331
std::unordered_map< KeyT, MappedT, std::hash< HashT > > uomap
Definition: ThreadPool.hh:91
std::vector< bool > bool_list_t
Definition: ThreadPool.hh:107
void set_initialization(initialize_func_t f)
Definition: ThreadPool.hh:236
void reset_initialization()
Definition: ThreadPool.hh:239
static void set_use_tbb(bool _v)
Definition: ThreadPool.cc:159
void record_exit()
Definition: ThreadPool.cc:347
VUserTaskQueue task_queue_t
Definition: ThreadPool.hh:104
VTask is the abstract class stored in thread_pool.
Definition: VTask.hh:43
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) PTL_NO_SANITIZE_THREAD=0
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
virtual void resize(intmax_t)=0
static size_t active_value(parameter param)
void initialize(int max_concurrency=automatic, unsigned reserved_for_masters=1)
void run(FuncT f)
Definition: ThreadData.hh:69
unsigned GetNumberOfCores()
Definition: Threading.cc:63
Definition: AutoLock.hh:255
Thread::id ThreadId
Definition: Threading.hh:46
std::mutex Mutex
Definition: Threading.hh:57
std::thread Thread
Definition: Threading.hh:37
tbb::task_group tbb_task_group_t
Definition: ThreadData.hh:124
tbb::task_arena tbb_task_arena_t
Definition: ThreadData.hh:125
VUserTaskQueue * task_queue
Definition: ThreadPool.hh:152
affinity_func_t set_affinity
Definition: ThreadPool.hh:153
initialize_func_t initializer
Definition: ThreadPool.hh:154
finalize_func_t finalizer
Definition: ThreadPool.hh:155