Geant4 10.7.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
VTaskGroup.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21//
22// Class Description:
23//
24// This file creates an abstract base class for the grouping the thread-pool
25// tasking system into independently joinable units
26//
27// ---------------------------------------------------------------
28// Author: Jonathan Madsen (Feb 13th 2018)
29// ---------------------------------------------------------------
30
31#include "PTL/VTaskGroup.hh"
32#include "PTL/Globals.hh"
33#include "PTL/Task.hh"
34#include "PTL/TaskRunManager.hh"
35#include "PTL/ThreadData.hh"
36#include "PTL/ThreadPool.hh"
37#include "PTL/VTask.hh"
38
39using namespace PTL;
40
41//======================================================================================//
42
43std::atomic_uintmax_t&
45{
46 static std::atomic_uintmax_t _instance(0);
47 return _instance;
48}
49
50//======================================================================================//
51
52int VTaskGroup::f_verbose = GetEnv<int>("PTL_VERBOSE", 0);
53
54//======================================================================================//
55
57: m_id(vtask_group_counter()++)
58, m_pool(tp)
59, m_tot_task_count(std::make_shared<atomic_int>(0))
60, m_task_cond(std::make_shared<condition_t>())
61, m_task_lock(std::make_shared<lock_t>())
62, m_main_tid(std::this_thread::get_id())
63{
66
67 if(!m_pool)
68 {
69 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! "
70 << "nullptr to thread pool!" << std::endl;
71 }
72}
73
74//======================================================================================//
75
77
78//======================================================================================//
79
80void
82{
83 // if no pool was initially present at creation
84 if(!m_pool)
85 {
86 // check for master MT run-manager
89
90 // if MTRunManager does not exist or no thread pool created
91 if(!m_pool)
92 {
93 if(f_verbose > 0)
94 {
95 fprintf(stderr, "%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
96 __FUNCTION__, __LINE__, static_cast<void*>(m_pool));
97 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! "
98 << "nullptr to thread pool!" << std::endl;
99 }
100 return;
101 }
102 }
103
105 if(!data)
106 return;
107
108 ThreadPool* tpool = (m_pool) ? m_pool : data->thread_pool;
109 VUserTaskQueue* taskq = (tpool) ? tpool->get_queue() : data->current_queue;
110
111 bool _is_master = data->is_master;
112 bool _within_task = data->within_task;
113
114 auto is_active_state = [&]() {
115 return (tpool->state()->load(std::memory_order_relaxed) !=
116 thread_pool::state::STOPPED);
117 };
118
119 auto execute_this_threads_tasks = [&]() {
120 if(!taskq)
121 return;
122
123 // only want to process if within a task
124 if((!_is_master || tpool->size() < 2) && _within_task)
125 {
126 int bin = static_cast<int>(taskq->GetThreadBin());
127 // const auto nitr = (tpool) ? tpool->size() : Thread::hardware_concurrency();
128 while(this->pending() > 0)
129 {
130 task_pointer _task = taskq->GetTask(bin);
131 if(_task)
132 (*_task)();
133 }
134 }
135 };
136
137 // checks for validity
139 {
140 // for external threads
141 if(!_is_master || tpool->size() < 2)
142 return;
143 }
144 else if(f_verbose > 0)
145 {
146 if(!tpool || !taskq)
147 {
148 // something is wrong, didn't create thread-pool?
149 fprintf(
150 stderr,
151 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue (%p)\n",
152 __FUNCTION__, __LINE__, static_cast<void*>(tpool),
153 static_cast<void*>(taskq));
154 }
155 // return if thread pool isn't built
156 else if(is_native_task_group() && !tpool->is_alive())
157 {
158 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not alive!\n",
159 __FUNCTION__, __LINE__);
160 }
161 else if(!is_active_state())
162 {
163 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not active!\n",
164 __FUNCTION__, __LINE__);
165 }
166 }
167
168 intmax_t wake_size = 2;
169 AutoLock _lock(*m_task_lock, std::defer_lock);
170
171 while(is_active_state())
172 {
173 execute_this_threads_tasks();
174
175 // while loop protects against spurious wake-ups
176 while(_is_master && pending() > 0 && is_active_state())
177 {
178 // auto _wake = [&]() { return (wake_size > pending() || !is_active_state());
179 // };
180
181 // lock before sleeping on condition
182 if(!_lock.owns_lock())
183 _lock.lock();
184
185 // Wait until signaled that a task has been competed
186 // Unlock mutex while wait, then lock it back when signaled
187 // when true, this wakes the thread
188 if(pending() >= wake_size)
189 {
190 m_task_cond->wait(_lock);
191 }
192 else
193 {
194 m_task_cond->wait_for(_lock, std::chrono::microseconds(100));
195 }
196 // unlock
197 if(_lock.owns_lock())
198 _lock.unlock();
199 }
200
201 // if pending is not greater than zero, we are joined
202 if(pending() <= 0)
203 break;
204 }
205
206 if(_lock.owns_lock())
207 _lock.unlock();
208
209 intmax_t ntask = this->task_count().load();
210 if(ntask > 0)
211 {
212 std::stringstream ss;
213 ss << "\nWarning! Join operation issue! " << ntask << " tasks still "
214 << "are running!" << std::endl;
215 std::cerr << ss.str();
216 this->wait();
217 }
218}
219
220//======================================================================================//
std::atomic_uintmax_t & vtask_group_counter()
Definition: VTaskGroup.cc:44
static TaskRunManager * GetMasterRunManager(bool useTBB=false)
ThreadPool * GetThreadPool() const
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35
VUserTaskQueue * current_queue
Definition: ThreadData.hh:115
ThreadPool * thread_pool
Definition: ThreadData.hh:114
const pool_state_type & state() const
Definition: ThreadPool.hh:149
task_queue_t * get_queue() const
Definition: ThreadPool.hh:135
size_type size() const
Definition: ThreadPool.hh:151
VTaskGroup(ThreadPool *tp=nullptr)
Definition: VTaskGroup.cc:56
std::atomic_intmax_t atomic_int
Definition: VTaskGroup.hh:66
atomic_int & task_count()
Definition: VTaskGroup.hh:130
virtual ~VTaskGroup()
Definition: VTaskGroup.cc:76
ThreadPool * m_pool
Definition: VTaskGroup.hh:136
std::shared_ptr< condition_t > m_task_cond
Definition: VTaskGroup.hh:138
virtual void wait()
Definition: VTaskGroup.cc:81
virtual intmax_t pending()
Definition: VTaskGroup.hh:119
Condition condition_t
Definition: VTaskGroup.hh:68
static int f_verbose
Definition: VTaskGroup.hh:142
virtual bool is_native_task_group() const
Definition: VTaskGroup.hh:114
std::shared_ptr< lock_t > m_task_lock
Definition: VTaskGroup.hh:139
VTask is the abstract class stored in thread_pool.
Definition: VTask.hh:55
Definition: AutoLock.hh:254