Geant4 11.2.2
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
G4TaskRunManager.cc
Go to the documentation of this file.
1//
2// ********************************************************************
3// * License and Disclaimer *
4// * *
5// * The Geant4 software is copyright of the Copyright Holders of *
6// * the Geant4 Collaboration. It is provided under the terms and *
7// * conditions of the Geant4 Software License, included in the file *
8// * LICENSE and available at http://cern.ch/geant4/license . These *
9// * include a list of copyright holders. *
10// * *
11// * Neither the authors of this software system, nor their employing *
12// * institutes,nor the agencies providing financial support for this *
13// * work make any representation or warranty, express or implied, *
14// * regarding this software system or assume any liability for its *
15// * use. Please see the license in the file LICENSE and URL above *
16// * for the full disclaimer and the limitation of liability. *
17// * *
18// * This code implementation is the result of the scientific and *
19// * technical work of the GEANT4 collaboration. *
20// * By using, copying, modifying or distributing the software (or *
21// * any work based on the software) you agree to acknowledge its *
22// * use in resulting scientific publications, and indicate your *
23// * acceptance of all terms of the Geant4 Software license. *
24// ********************************************************************
25//
26//
27
28#include "G4TaskRunManager.hh"
29
30#include "G4AutoLock.hh"
31#include "G4EnvironmentUtils.hh"
33#include "G4Run.hh"
34#include "G4ScoringManager.hh"
35#include "G4StateManager.hh"
36#include "G4Task.hh"
37#include "G4TaskGroup.hh"
38#include "G4TaskManager.hh"
41#include "G4ThreadPool.hh"
42#include "G4Threading.hh"
43#include "G4TiMemory.hh"
44#include "G4Timer.hh"
46#include "G4UImanager.hh"
47#include "G4UserRunAction.hh"
49#include "G4UserTaskQueue.hh"
53#include "G4WorkerThread.hh"
54
55#include <cstdlib>
56#include <cstring>
57#include <iterator>
58
59//============================================================================//
60
61namespace
62{
63G4Mutex scorerMergerMutex;
64G4Mutex runMergerMutex;
65G4Mutex setUpEventMutex;
66} // namespace
67
68//============================================================================//
69
74
75//============================================================================//
76
78 : PTL::TaskRunManager(useTBB), eventGrainsize(grainsize)
79{
80 if (task_queue != nullptr) taskQueue = task_queue;
81
82 // override default of 2 from G4MTRunManager
84 fMasterRM = this;
85 MTkernel = static_cast<G4TaskRunManagerKernel*>(kernel);
86
87 G4int numberOfStaticAllocators = kernel->GetNumberOfStaticAllocators();
88 if (numberOfStaticAllocators > 0) {
90 msg1 << "There are " << numberOfStaticAllocators << " static G4Allocator objects detected.\n"
91 << "In multi-threaded mode, all G4Allocator objects must "
92 << "be dynamicly instantiated.";
93 G4Exception("G4TaskRunManager::G4TaskRunManager", "Run1035", FatalException, msg1);
94 }
95
98
99 // use default RandomNumberGenerator if created by user, or create default
100 masterRNGEngine = G4Random::getTheEngine();
101
104
105 //------------------------------------------------------------------------//
106 // handle threading
107 //------------------------------------------------------------------------//
108 auto _nthread_env = G4GetEnv<G4String>("G4FORCENUMBEROFTHREADS", "");
109 for (auto& itr : _nthread_env)
110 itr = (char)std::tolower(itr);
111
112 if (_nthread_env == "max")
114 else if (!_nthread_env.empty()) {
115 std::stringstream ss;
116 G4int _nthread_val = -1;
117 ss << _nthread_env;
118 ss >> _nthread_val;
119 if (_nthread_val > 0) forcedNwokers = _nthread_val;
120
122 }
123
124 //------------------------------------------------------------------------//
125 // option for forcing TBB
126 //------------------------------------------------------------------------//
127#ifdef GEANT4_USE_TBB
128 G4int _useTBB = G4GetEnv<G4int>("G4FORCE_TBB", (G4int)useTBB);
129 if (_useTBB > 0) useTBB = true;
130#else
131 if (useTBB) {
133 msg << "TBB was requested but Geant4 was not built with TBB support";
134 G4Exception("G4TaskRunManager::G4TaskRunManager(...)", "Run0131", JustWarning, msg);
135 }
136 useTBB = false;
137#endif
138
139 // handle TBB
141}
142
143//============================================================================//
144
146
147//============================================================================//
148
150{
151 // finalize profiler before shutting down the threads
153
154 // terminate all the workers
156
157 // trigger all G4AutoDelete instances
159
160 // delete the task-group
161 delete workTaskGroup;
162 workTaskGroup = nullptr;
163
164 // destroy the thread-pool
165 if (threadPool != nullptr) threadPool->destroy_threadpool();
166
168}
169
170//============================================================================//
171
176
177//============================================================================//
178
180{
181 std::ostringstream os;
182 os << randomNumberStatusDir << "G4Master_" << fn << ".rndm";
183 G4Random::saveEngineStatus(os.str().c_str());
184}
185
186//============================================================================//
187
189{
190 if (forcedNwokers > 0) {
191 if (verboseLevel > 0) {
193 msg << "\n### Number of threads is forced to " << forcedNwokers
194 << " by G4FORCENUMBEROFTHREADS environment variable. G4TaskRunManager::" << __FUNCTION__
195 << "(" << n << ") ignored ###";
196 G4Exception("G4TaskRunManager::SetNumberOfThreads(G4int)", "Run0132", JustWarning, msg);
197 }
199 }
200 else {
201 nworkers = n;
202 if (poolInitialized) {
203 if (verboseLevel > 0) {
204 std::stringstream ss;
205 ss << "\n### Thread-pool already initialized. Resizing to " << nworkers << "threads ###";
206 G4cout << ss.str() << "\n" << G4endl;
207 }
208 GetThreadPool()->resize(n);
209 }
210 }
211}
212
213//============================================================================//
214
216{
217 // If the ThreadPool isn't initialized, it will return 0 even if we've already
218 // set nworkers
220}
221
222//============================================================================//
223
225{
226 G4bool firstTime = (threadPool == nullptr);
227 if (firstTime) InitializeThreadPool();
228
230
231 // make sure all worker threads are set up.
233 if (firstTime) G4RunManager::SetRunIDCounter(0);
234 // G4UImanager::GetUIpointer()->SetIgnoreCmdNotFound(true);
235}
236
237//============================================================================//
238
240{
241 if (poolInitialized && (threadPool != nullptr) && (workTaskGroup != nullptr)) {
242 G4Exception("G4TaskRunManager::InitializeThreadPool", "Run1040", JustWarning,
243 "Threadpool already initialized. Ignoring...");
244 return;
245 }
246
249
250 // create the joiners
251 if (workTaskGroup == nullptr) {
253 }
254
255 if (verboseLevel > 0) {
256 std::stringstream ss;
257 ss.fill('=');
258 ss << std::setw(90) << "";
259 G4cout << "\n" << ss.str() << G4endl;
260
262 G4cout << "G4TaskRunManager :: Using TBB..." << G4endl;
263 }
264 else {
265 G4cout << "G4TaskRunManager :: Using G4ThreadPool..." << G4endl;
266 }
267
268 G4cout << ss.str() << "\n" << G4endl;
269 }
270}
271
272//============================================================================//
273
275{
276 // Nothing to do
277}
278
279//============================================================================//
280
282{
283 // Nothing to do
284}
285
286//============================================================================//
287
289{
290 G4int grainSize = (eventGrainsize == 0) ? (G4int)threadPool->size() : eventGrainsize;
291 grainSize = G4GetEnv<G4int>("G4FORCE_GRAINSIZE", grainSize, "Forcing grainsize...");
292 if (grainSize == 0) grainSize = 1;
293
294 G4int nEvtsPerTask =
295 (numberOfEventToBeProcessed > grainSize) ? (numberOfEventToBeProcessed / grainSize) : 1;
296
297 if (eventModuloDef > 0) {
299 }
300 else {
302 if (eventModulo < 1) eventModulo = 1;
303 }
304 if (eventModulo > nEvtsPerTask) {
305 G4int oldMod = eventModulo;
306 eventModulo = nEvtsPerTask;
307
309 msgd << "Event modulo is reduced to " << eventModulo << " (was " << oldMod << ")"
310 << " to distribute events to all threads.";
311 G4Exception("G4TaskRunManager::InitializeEventLoop()", "Run10035", JustWarning, msgd);
312 }
313 nEvtsPerTask = eventModulo;
314
315 if (fakeRun)
316 nEvtsPerTask = G4GetEnv<G4int>("G4FORCE_EVENTS_PER_TASK", nEvtsPerTask,
317 "Forcing number of events per task (overrides grainsize)...");
318 else
319 nEvtsPerTask = G4GetEnv<G4int>("G4FORCE_EVENTS_PER_TASK", nEvtsPerTask);
320
321 if (nEvtsPerTask < 1) nEvtsPerTask = 1;
322
323 numberOfTasks = numberOfEventToBeProcessed / nEvtsPerTask;
324 numberOfEventsPerTask = nEvtsPerTask;
325 eventModulo = numberOfEventsPerTask;
326
327 if (fakeRun && verboseLevel > 1) {
328 std::stringstream msg;
329 msg << "--> G4TaskRunManager::ComputeNumberOfTasks() --> " << numberOfTasks << " tasks with "
330 << numberOfEventsPerTask << " events/task...";
331
332 std::stringstream ss;
333 ss.fill('=');
334 ss << std::setw((G4int)msg.str().length()) << "";
335 G4cout << "\n" << ss.str() << "\n" << msg.str() << "\n" << ss.str() << "\n" << G4endl;
336 }
337}
338
339//============================================================================//
340
342{
343 // Now loop on requested number of workers
344 // This will also start the workers
345 // Currently we do not allow to change the
346 // number of threads: threads area created once
347 // Instead of pthread based workers, create tbbTask
348 static bool initializeStarted = false;
349
351
352 if (fakeRun) {
353 if (initializeStarted) {
354 auto initCmdStack = GetCommandStack();
355 if (!initCmdStack.empty()) {
356 threadPool->execute_on_all_threads([initCmdStack]() {
357 for (auto& itr : initCmdStack)
360 });
361 }
362 }
363 else {
364 std::stringstream msg;
365 msg << "--> G4TaskRunManager::CreateAndStartWorkers() --> "
366 << "Initializing workers...";
367
368 std::stringstream ss;
369 ss.fill('=');
370 ss << std::setw((G4int)msg.str().length()) << "";
371 G4cout << "\n" << ss.str() << "\n" << msg.str() << "\n" << ss.str() << "\n" << G4endl;
372
375 }
376 initializeStarted = true;
377 }
378 else {
379 auto initCmdStack = GetCommandStack();
380 if (!initCmdStack.empty()) {
381 threadPool->execute_on_all_threads([initCmdStack]() {
382 for (auto& itr : initCmdStack)
384 });
385 }
386
387 // cleans up a previous run and events in case a thread
388 // does not execute any tasks
390
391 {
392 std::stringstream msg;
393 msg << "--> G4TaskRunManager::CreateAndStartWorkers() --> "
394 << "Creating " << numberOfTasks << " tasks with " << numberOfEventsPerTask
395 << " events/task...";
396
397 std::stringstream ss;
398 ss.fill('=');
399 ss << std::setw((G4int)msg.str().length()) << "";
400 G4cout << "\n" << ss.str() << "\n" << msg.str() << "\n" << ss.str() << "\n" << G4endl;
401 }
402
404 for (G4int nt = 0; nt < numberOfTasks + 1; ++nt) {
405 if (remaining > 0) AddEventTask(nt);
406 remaining -= numberOfEventsPerTask;
407 }
409 }
410}
411
412//============================================================================//
413
415{
416 if (verboseLevel > 1) G4cout << "Adding task " << nt << " to task-group..." << G4endl;
418}
419
420//============================================================================//
421
423{
425 G4int nFill = 0;
426 switch (SeedOncePerCommunication()) {
427 case 0:
429 break;
430 case 1:
431 nFill = numberOfTasks - nSeedsFilled;
432 break;
433 case 2:
434 default:
436 }
437 // Generates up to nSeedsMax seed pairs only.
438 if (nFill > nSeedsMax) nFill = nSeedsMax;
439 masterRNGEngine->flatArray(nSeedsPerEvent * nFill, randDbl);
440 helper->Refill(randDbl, nFill);
441 nSeedsFilled += nFill;
442}
443
444//============================================================================//
445
446void G4TaskRunManager::InitializeEventLoop(G4int n_event, const char* macroFile, G4int n_select)
447{
448 MTkernel->SetUpDecayChannels();
451
452 if (!fakeRun) {
453 nSeedsUsed = 0;
454 nSeedsFilled = 0;
455
456 if (verboseLevel > 0) timer->Start();
457
458 n_select_msg = n_select;
459 if (macroFile != nullptr) {
460 if (n_select_msg < 0) n_select_msg = n_event;
461
462 msgText = "/control/execute ";
463 msgText += macroFile;
464 selectMacro = macroFile;
465 }
466 else {
467 n_select_msg = -1;
468 selectMacro = "";
469 }
470
472
473 // initialize seeds
474 // If user did not implement InitializeSeeds,
475 // use default: nSeedsPerEvent seeds per event
476
477 if (n_event > 0) {
478 G4bool _overload = InitializeSeeds(n_event);
479 G4bool _functor = false;
480 if (!_overload) _functor = initSeedsCallback(n_event, nSeedsPerEvent, nSeedsFilled);
481 if (!_overload && !_functor) {
483 switch (SeedOncePerCommunication()) {
484 case 0:
485 nSeedsFilled = n_event;
486 break;
487 case 1:
488 nSeedsFilled = numberOfTasks;
489 break;
490 case 2:
491 nSeedsFilled = n_event / eventModulo + 1;
492 break;
493 default:
495 msgd << "Parameter value <" << SeedOncePerCommunication()
496 << "> of seedOncePerCommunication is invalid. It is reset "
497 "to 0.";
498 G4Exception("G4TaskRunManager::InitializeEventLoop()", "Run10036", JustWarning, msgd);
500 nSeedsFilled = n_event;
501 }
502
503 // Generates up to nSeedsMax seed pairs only.
505 masterRNGEngine->flatArray(nSeedsPerEvent * nSeedsFilled, randDbl);
506 helper->Fill(randDbl, nSeedsFilled, n_event, nSeedsPerEvent);
507 }
508 }
509 }
510
511 // Now initialize workers. Check if user defined a WorkerThreadInitialization
512 if (userWorkerThreadInitialization == nullptr)
514
515 // Prepare UI commands for threads
517
518 // Start worker threads
520}
521
522//============================================================================//
523
525{
526 // Wait for all worker threads to have finished the run
527 // i.e. wait for them to return from RunTermination()
528 // This guarantee that userrunaction for workers has been called
529
530 // Wait now for all threads to finish event-loop
532 // Now call base-class methof
535}
536
537//============================================================================//
538
540{
542 // Call base class stuff...
544
545 masterWorlds.clear();
548 for (G4int iWorld = 0; iWorld < nWorlds; ++iWorld) {
549 addWorld(iWorld, *itrW);
550 ++itrW;
551 }
552}
553
554//============================================================================//
555
556void G4TaskRunManager::MergeScores(const G4ScoringManager* localScoringManager)
557{
558 G4AutoLock l(&scorerMergerMutex);
559 if (masterScM != nullptr) masterScM->Merge(localScoringManager);
560}
561
562//============================================================================//
563
565{
566 G4AutoLock l(&runMergerMutex);
567 if (currentRun != nullptr) currentRun->Merge(localRun);
568}
569
570//============================================================================//
571
573 G4bool reseedRequired)
574{
575 G4AutoLock l(&setUpEventMutex);
578 if (reseedRequired) {
580 G4int idx_rndm = nSeedsPerEvent * nSeedsUsed;
581 s1 = helper->GetSeed(idx_rndm);
582 s2 = helper->GetSeed(idx_rndm + 1);
583 if (nSeedsPerEvent == 3) s3 = helper->GetSeed(idx_rndm + 2);
584 ++nSeedsUsed;
586 }
588 return true;
589 }
590 return false;
591}
592
593//============================================================================//
594
596{
597 G4AutoLock l(&setUpEventMutex);
599 G4int nevt = numberOfEventsPerTask;
600 G4int nmod = eventModulo;
604 }
606
607 if (reseedRequired) {
609 G4int nevRnd = nmod;
610 if (SeedOncePerCommunication() > 0) nevRnd = 1;
611 for (G4int i = 0; i < nevRnd; ++i) {
612 seedsQueue->push(helper->GetSeed(nSeedsPerEvent * nSeedsUsed));
613 seedsQueue->push(helper->GetSeed(nSeedsPerEvent * nSeedsUsed + 1));
614 if (nSeedsPerEvent == 3) seedsQueue->push(helper->GetSeed(nSeedsPerEvent * nSeedsUsed + 2));
615 nSeedsUsed++;
617 }
618 }
620 return nevt;
621 }
622 return 0;
623}
624
625//============================================================================//
626
628{
629 // Force workers to execute (if any) all UI commands left in the stack
631
632 if (workTaskGroup != nullptr) {
634 if (!fakeRun)
636 }
637}
638
639//============================================================================//
640
642{
643 // This method is valid only for GeomClosed or EventProc state
645 if (currentState == G4State_GeomClosed || currentState == G4State_EventProc) {
646 runAborted = true;
647 MTkernel->BroadcastAbortRun(softAbort);
648 }
649 else {
650 G4cerr << "Run is not in progress. AbortRun() ignored." << G4endl;
651 }
652}
653
654//============================================================================//
655
657{
658 // nothing to do in the master thread
659}
660
661//============================================================================//
662
672
673//============================================================================//
674
676{
678
679 auto process_commands_stack = []() {
681 if (mrm != nullptr) {
682 auto cmds = mrm->GetCommandStack();
683 for (const auto& itr : cmds)
684 G4UImanager::GetUIpointer()->ApplyCommand(itr); // TLS instance
686 }
687 };
688
689 if (threadPool != nullptr) threadPool->execute_on_all_threads(process_commands_stack);
690}
691
692//============================================================================//
693
695
696//============================================================================//
G4ApplicationState
@ G4State_EventProc
@ G4State_GeomClosed
_Tp G4GetEnv(const std::string &env_id, _Tp _default=_Tp())
@ JustWarning
@ FatalException
void G4Exception(const char *originOfException, const char *exceptionCode, G4ExceptionSeverity severity, const char *description)
std::ostringstream G4ExceptionDescription
std::queue< G4long > G4SeedsQueue
G4Thread::id G4ThreadId
std::mutex G4Mutex
double G4double
Definition G4Types.hh:83
long G4long
Definition G4Types.hh:87
bool G4bool
Definition G4Types.hh:86
int G4int
Definition G4Types.hh:85
G4GLOB_DLL std::ostream G4cerr
#define G4endl
Definition G4ios.hh:67
G4GLOB_DLL std::ostream G4cout
virtual void flatArray(const int size, double *vect)=0
void SetEventID(G4int i)
Definition G4Event.hh:85
static G4int SeedOncePerCommunication()
static void SetSeedOncePerCommunication(G4int val)
G4int numberOfEventToBeProcessed
virtual void ThisWorkerProcessCommandsStackDone()
static void addWorld(G4int counter, G4VPhysicalVolume *w)
virtual void PrepareCommandsStack()
static G4MTRUN_DLL G4ScoringManager * masterScM
static G4MTRUN_DLL G4MTRunManager * fMasterRM
static G4MTRUN_DLL masterWorlds_t masterWorlds
static G4MTRunManager * GetMasterRunManager()
static G4ThreadId GetMasterThreadId()
std::vector< G4String > GetCommandStack()
static void Finalize()
G4int GetNumberOfStaticAllocators() const
virtual void Initialize()
G4Timer * timer
void SetRunIDCounter(G4int i)
G4UserWorkerThreadInitialization * userWorkerThreadInitialization
G4int numberOfEventProcessed
G4RunManagerKernel * kernel
G4Run * currentRun
G4String msgText
virtual void BeamOn(G4int n_event, const char *macroFile=nullptr, G4int n_select=-1)
G4String selectMacro
virtual void RunTermination()
G4String randomNumberStatusDir
virtual void TerminateEventLoop()
virtual void ConstructScoringWorlds()
Definition G4Run.hh:49
virtual void Merge(const G4Run *)
Definition G4Run.cc:65
void Merge(const G4ScoringManager *scMan)
static G4ScoringManager * GetScoringManagerIfExist()
const G4ApplicationState & GetCurrentState() const
static G4StateManager * GetStateManager()
void BroadcastAbortRun(G4bool softAbort)
static std::vector< G4String > & InitCommandStack()
G4int GetNumberOfThreads() const override
void ConstructScoringWorlds() override
void InitializeThreadPool() override
void CreateAndStartWorkers() override
G4bool InitializeSeeds(G4int) override
static G4TaskRunManager * GetMasterRunManager()
void ThisWorkerProcessCommandsStackDone() override
G4int SetUpNEvents(G4Event *, G4SeedsQueue *seedsQueue, G4bool reseedRequired=true) override
G4VUserTaskQueue *& taskQueue
void SetNumberOfThreads(G4int n) override
void AbortRun(G4bool softAbort=false) override
InitializeSeedsCallback initSeedsCallback
void RefillSeeds() override
void RequestWorkersProcessCommandsStack() override
virtual void AddEventTask(G4int)
static G4ThreadId GetMasterThreadId()
void TerminateOneEvent() override
RunTaskGroup * workTaskGroup
void TerminateWorkers() override
void AbortEvent() override
~G4TaskRunManager() override
void InitializeEventLoop(G4int n_event, const char *macroFile=nullptr, G4int n_select=-1) override
void Initialize() override
void StoreRNGStatus(const G4String &filenamePrefix) override
void RunTermination() override
void MergeScores(const G4ScoringManager *localScoringManager)
void WaitForEndEventLoopWorkers() override
G4TaskGroup< void > RunTaskGroup
G4ThreadPool *& threadPool
static G4TaskRunManagerKernel * GetMTMasterRunManagerKernel()
void ProcessOneEvent(G4int i_event) override
void MergeRun(const G4Run *localRun)
virtual void ComputeNumberOfTasks()
G4TaskRunManager(G4bool useTBB=G4GetEnv< G4bool >("G4USE_TBB", false))
G4bool SetUpAnEvent(G4Event *, G4long &s1, G4long &s2, G4long &s3, G4bool reseedRequired=true) override
static G4TemplateRNGHelper< T > * GetInstance()
virtual const T GetSeed(const G4int &sdId)
void Fill(G4double *dbl, G4int nev, G4int nev_tot, G4int nrpe)
void Refill(G4double *dbl, G4int nev)
void Start()
static G4TransportationManager * GetTransportationManager()
std::vector< G4VPhysicalVolume * >::iterator GetWorldsIterator()
std::size_t GetNoWorlds() const
G4int ApplyCommand(const char *aCommand)
void SetMasterUIManager(G4bool val)
static G4UImanager * GetUIpointer()
static G4WorkerTaskRunManager * GetWorkerRunManager()
Up join(Up accum={})
Definition TaskGroup.hh:654
enable_if_t< std::is_void< Up >::value, void > exec(Func func, Args... args)
Definition TaskGroup.hh:531
virtual int GetNumberOfThreads() const
virtual void Initialize(uint64_t n=std::thread::hardware_concurrency())
virtual void Terminate()
void SetVerbose(int val)
ThreadPool * GetThreadPool() const
void resize(size_type _n)
bool is_tbb_threadpool() const
void execute_on_all_threads(FuncT &&_func)
size_type size() const
size_type destroy_threadpool()
static void set_use_tbb(bool _v)
G4int G4GetNumberOfCores()