riCOM_cpp
This repository contains the C++ implementation of the riCOM (Real Time Centre Of Mass) algorithm for 4D Scanning electron microscopy.
BoundedThreadPool.hpp
Go to the documentation of this file.
1 /* Copyright (C) 2021 Thomas Friedrich, Chu-Ping Yu,
2  * University of Antwerp - All Rights Reserved.
3  * You may use, distribute and modify
4  * this code under the terms of the GPL3 license.
5  * You should have received a copy of the GPL3 license with
6  * this file. If not, please visit:
7  * https://www.gnu.org/licenses/gpl-3.0.en.html
8  *
9  * Authors:
10  * Thomas Friedrich <thomas.friedrich@uantwerpen.be>
11  * Chu-Ping Yu <chu-ping.yu@uantwerpen.be>
12  */
13 
14 #ifndef BOUNDED_THREAD_POOL_H
15 #define BOUNDED_THREAD_POOL_H
16 
17 #include <thread>
18 #include <queue>
19 #include <functional>
20 #include <mutex>
21 #include <condition_variable>
22 #include <atomic>
23 #include <iostream>
24 
26 {
27 private:
28  std::vector<std::thread> threads;
29  std::queue<std::function<void()>> tasks;
30  std::atomic<bool> b_running;
31  std::mutex mtx_queue_full;
32  std::mutex mtx_queue_empty;
33  std::condition_variable cnd_buffer_full;
34  std::condition_variable cnd_buffer_empty;
35 
36  void create_threads()
37  {
38  for (int i = 0; i < n_threads; i++)
39  {
40  threads.push_back(std::thread(&BoundedThreadPool::worker, this));
41  }
42  }
43 
44  inline void execute_task(std::unique_lock<std::mutex> *lock)
45  {
46  std::function<void()> task = std::move(tasks.front());
47  tasks.pop();
48  cnd_buffer_full.notify_one();
49  lock->unlock();
50  task();
51  lock->lock();
52  }
53  inline void wait_for_task()
54  {
55  std::unique_lock<std::mutex> lock(mtx_queue_empty);
56  cnd_buffer_empty.wait(lock, [this]
57  { return !tasks.empty() || !b_running; });
58  if (!tasks.empty())
59  {
60  execute_task(&lock);
61  }
62  }
63 
64  void worker()
65  {
66  while (b_running)
67  {
68  try
69  {
70  wait_for_task();
71  }
72  catch (const std::exception &e)
73  {
74  std::cerr << e.what() << std::endl;
75  }
76  }
77  }
78 
79 public:
80  int n_threads;
81  int limit;
82 
83  template <typename T>
84  inline void push_task(const T &task)
85  {
86  std::unique_lock<std::mutex> lock(mtx_queue_full);
87  cnd_buffer_full.wait(lock, [this]
88  { return ((int)tasks.size() < limit); });
89  tasks.push(std::function<void()>(task));
90  cnd_buffer_empty.notify_one();
91  }
92 
93  template <typename T, typename... A>
94  inline void push_task(const T &task, const A &...args)
95  {
96  push_task([task, args...]
97  { task(args...); });
98  }
99 
101  {
102  std::unique_lock<std::mutex> lock(mtx_queue_full);
103  cnd_buffer_full.wait(lock, [this]
104  { return tasks.empty(); });
105  }
106 
108  {
109  for (int i = 0; i < n_threads; i++)
110  {
111  threads[i].join();
112  }
113  }
114 
115  void init(int n_threads, int limit)
116  {
117  int n_threads_max = std::thread::hardware_concurrency();
118  if (n_threads > n_threads_max || n_threads < 1)
119  {
120  this->n_threads = n_threads_max;
121  }
122  else
123  {
124  this->n_threads = n_threads;
125  }
126  this->limit = limit;
127  b_running = true;
128  create_threads();
129  }
130 
131  explicit BoundedThreadPool(int n_threads) : limit(8)
132  {
133  init(n_threads, limit);
134  }
135 
136  explicit BoundedThreadPool(int n_threads, int limit)
137  {
138  init(n_threads, limit);
139  }
140 
141  BoundedThreadPool() : b_running(false), n_threads(0), limit(0) {}
142 
144  {
146  b_running = false;
147  cnd_buffer_empty.notify_all();
148  join_threads();
149  }
150 };
151 
152 #endif
BoundedThreadPool(int n_threads, int limit)
void init(int n_threads, int limit)
void push_task(const T &task)
void push_task(const T &task, const A &...args)
BoundedThreadPool(int n_threads)