Eclipse SUMO - Simulation of Urban MObility
FXWorkerThread.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2004-2019 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials
5 // are made available under the terms of the Eclipse Public License v2.0
6 // which accompanies this distribution, and is available at
7 // http://www.eclipse.org/legal/epl-v20.html
8 // SPDX-License-Identifier: EPL-2.0
9 /****************************************************************************/
15 // A thread class together with a pool and a task for parallelized computation
16 /****************************************************************************/
17 
18 #ifndef FXWorkerThread_h
19 #define FXWorkerThread_h
20 
21 // #define WORKLOAD_PROFILING
22 // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
23 // undefine to use summary report only
24 #define WORKLOAD_INTERVAL 100
25 
26 // ===========================================================================
27 // included modules
28 // ===========================================================================
29 #include <config.h>
30 
31 #include <list>
32 #include <vector>
33 #include <fx.h>
34 #ifdef WORKLOAD_PROFILING
35 #include <chrono>
37 #include <utils/common/ToString.h>
38 #endif
40 
41 
42 // ===========================================================================
43 // class definitions
44 // ===========================================================================
49 class FXWorkerThread : public FXThread {
50 
51 public:
56  class Task {
57  public:
59  virtual ~Task() {};
60 
69  virtual void run(FXWorkerThread* context) = 0;
70 
77  void setIndex(const int newIndex) {
78  myIndex = newIndex;
79  }
80  private:
82  int myIndex;
83  };
84 
89  class Pool {
90  public:
97  Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
98 #ifdef WORKLOAD_PROFILING
99  , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
100 #endif
101  {
102 #ifdef WORKLOAD_PROFILING
103  long long int timeDiff = 0;
104  for (int i = 0; i < 100; i++) {
105  const auto begin = std::chrono::high_resolution_clock::now();
106  const auto end = std::chrono::high_resolution_clock::now();
107  timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
108  }
109  //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
110 #endif
111  while (numThreads > 0) {
112  new FXWorkerThread(*this);
113  numThreads--;
114  }
115  }
116 
121  virtual ~Pool() {
122  clear();
123  }
124 
127  void clear() {
128  for (FXWorkerThread* const worker : myWorkers) {
129  delete worker;
130  }
131  myWorkers.clear();
132  }
133 
138  void addWorker(FXWorkerThread* const w) {
139  myWorkers.push_back(w);
140  }
141 
148  void add(Task* const t, int index = -1) {
149  if (index < 0) {
150  index = myRunningIndex % myWorkers.size();
151  }
152 #ifdef WORKLOAD_PROFILING
153  if (myRunningIndex == 0) {
154  for (FXWorkerThread* const worker : myWorkers) {
155  worker->startProfile();
156  }
157  myProfileStart = std::chrono::high_resolution_clock::now();
158  }
159 #endif
160  t->setIndex(myRunningIndex++);
161  myWorkers[index]->add(t);
162  }
163 
170  void addFinished(std::list<Task*>& tasks) {
171  myMutex.lock();
172  myFinishedTasks.splice(myFinishedTasks.end(), tasks);
173  myCondition.signal();
174  myMutex.unlock();
175  }
176 
178  myMutex.lock();
179  if (myException == nullptr) {
180  myException = new ProcessError(e);
181  }
182  myMutex.unlock();
183  }
184 
186  void waitAll(const bool deleteFinished = true) {
187  myMutex.lock();
188  while ((int)myFinishedTasks.size() < myRunningIndex) {
189  myCondition.wait(myMutex);
190  }
191 #ifdef WORKLOAD_PROFILING
192  if (myRunningIndex > 0) {
193  const auto end = std::chrono::high_resolution_clock::now();
194  const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
195  double minLoad = std::numeric_limits<double>::max();
196  double maxLoad = 0.;
197  for (FXWorkerThread* const worker : myWorkers) {
198  const double load = worker->endProfile(elapsed);
199  minLoad = MIN2(minLoad, load);
200  maxLoad = MAX2(maxLoad, load);
201  }
202 #ifdef WORKLOAD_INTERVAL
203  myTotalMaxLoad += maxLoad;
204  myTotalSpread += maxLoad / minLoad;
205  myNumBatches++;
206  if (myNumBatches % WORKLOAD_INTERVAL == 0) {
207  WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
208  myTotalMaxLoad = 0.;
209  myTotalSpread = 0.;
210  }
211 #endif
212  }
213 #endif
214  if (deleteFinished) {
215  for (Task* task : myFinishedTasks) {
216  delete task;
217  }
218  }
219  ProcessError* toRaise = myException;
220  myException = nullptr;
221  myFinishedTasks.clear();
222  myRunningIndex = 0;
223  myMutex.unlock();
224  if (toRaise != nullptr) {
225  throw* toRaise;
226  }
227  }
228 
236  bool isFull() const {
237  return myRunningIndex - (int)myFinishedTasks.size() >= size();
238  }
239 
244  int size() const {
245  return (int)myWorkers.size();
246  }
247 
249  void lock() {
250  myPoolMutex.lock();
251  }
252 
254  void unlock() {
255  myPoolMutex.unlock();
256  }
257 
258  private:
260  std::vector<FXWorkerThread*> myWorkers;
262  FXMutex myMutex;
264  FXMutex myPoolMutex;
266  FXCondition myCondition;
268  std::list<Task*> myFinishedTasks;
273 #ifdef WORKLOAD_PROFILING
274  int myNumBatches;
277  double myTotalMaxLoad;
279  double myTotalSpread;
281  std::chrono::high_resolution_clock::time_point myProfileStart;
282 #endif
283  };
284 
285 public:
292  FXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
293 #ifdef WORKLOAD_PROFILING
294  , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
295 #endif
296  {
297  pool.addWorker(this);
298  start();
299  }
300 
305  virtual ~FXWorkerThread() {
306  stop();
307 #ifdef WORKLOAD_PROFILING
308  const double load = 100. * myTotalBusyTime / myTotalTime;
309  WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
310  " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
311  "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
312 #endif
313  }
314 
319  void add(Task* t) {
320  myMutex.lock();
321  myTasks.push_back(t);
322  myCondition.signal();
323  myMutex.unlock();
324  }
325 
332  FXint run() {
333  while (!myStopped) {
334  myMutex.lock();
335  while (!myStopped && myTasks.empty()) {
336  myCondition.wait(myMutex);
337  }
338  if (myStopped) {
339  myMutex.unlock();
340  break;
341  }
342  myCurrentTasks.splice(myCurrentTasks.end(), myTasks);
343  myMutex.unlock();
344  try {
345  for (Task* const t : myCurrentTasks) {
346 #ifdef WORKLOAD_PROFILING
347  const auto before = std::chrono::high_resolution_clock::now();
348 #endif
349  t->run(this);
350 #ifdef WORKLOAD_PROFILING
351  const auto after = std::chrono::high_resolution_clock::now();
352  myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
353  myCounter++;
354 #endif
355  }
356  } catch (ProcessError& e) {
357  myPool.setException(e);
358  }
360  }
361  return 0;
362  }
363 
368  void stop() {
369  myMutex.lock();
370  myStopped = true;
371  myCondition.signal();
372  myMutex.unlock();
373  join();
374  }
375 
376 #ifdef WORKLOAD_PROFILING
377  void startProfile() {
378  myBusyTime = 0;
379  }
380 
381  double endProfile(const long long int time) {
382  myTotalTime += time;
383  myTotalBusyTime += myBusyTime;
384  return time == 0 ? 100. : 100. * myBusyTime / time;
385  }
386 #endif
387 
388 private:
392  FXMutex myMutex;
394  FXCondition myCondition;
396  std::list<Task*> myTasks;
398  std::list<Task*> myCurrentTasks;
400  bool myStopped;
401 #ifdef WORKLOAD_PROFILING
402  int myCounter;
405  long long int myBusyTime;
407  long long int myTotalBusyTime;
409  long long int myTotalTime;
410 #endif
411 };
412 
413 
414 #endif
std::vector< FXWorkerThread * > myWorkers
the current worker threads
void waitAll(const bool deleteFinished=true)
waits for all tasks to be finished
virtual ~FXWorkerThread()
Destructor.
FXCondition myCondition
the semaphore when waiting for new tasks
std::list< Task * > myTasks
the list of pending tasks
int myRunningIndex
the running index for the next task
FXWorkerThread(Pool &pool)
Constructor.
virtual ~Pool()
Destructor.
T MAX2(T a, T b)
Definition: StdDefs.h:80
void add(Task *t)
Adds the given task to this thread to be calculated.
FXMutex myMutex
the internal mutex for the task list
FXMutex myPoolMutex
the pool mutex for external sync
FXint run()
Main execution method of this thread.
void lock()
locks the pool mutex
bool myStopped
whether we are still running
int myIndex
the index of the task, valid only after the task has been added to the pool
std::list< Task * > myFinishedTasks
list of finished tasks
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition: ToString.h:48
Pool(int numThreads=0)
Constructor.
void addFinished(std::list< Task *> &tasks)
Adds the given tasks to the list of finished tasks.
int size() const
Returns the number of threads in the pool.
FXMutex myMutex
the mutex for the task list
void clear()
Stops and deletes all worker threads.
virtual ~Task()
Desctructor.
T MIN2(T a, T b)
Definition: StdDefs.h:74
ProcessError * myException
the exception from a child thread
void setException(ProcessError &e)
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index. If the index is negative, assign to the next (round robin) one.
#define WORKLOAD_INTERVAL
A pool of worker threads which distributes the tasks and collects the results.
bool isFull() const
Checks whether there are currently more pending tasks than threads.
void addWorker(FXWorkerThread *const w)
Adds the given thread to the pool.
Pool & myPool
the pool for this thread
virtual void run(FXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
void unlock()
unlocks the pool mutex
std::list< Task * > myCurrentTasks
the list of tasks which are currently calculated
void stop()
Stops the thread.
Abstract superclass of a task to be run with an index to keep track of pending tasks.
A thread repeatingly calculating incoming tasks.
FXCondition myCondition
the semaphore to wait on for finishing all tasks
#define WRITE_MESSAGE(msg)
Definition: MsgHandler.h:240
void setIndex(const int newIndex)
Sets the running index of this task.