riCOM_cpp
This repository contains the C++ implementation of the riCOM (Real Time Centre Of Mass) algorithm for 4D Scanning electron microscopy.
BoundedThreadPool.hpp
Go to the documentation of this file.
1 /* Copyright (C) 2021 Thomas Friedrich, Chu-Ping Yu,
2  * University of Antwerp - All Rights Reserved.
3  * You may use, distribute and modify
4  * this code under the terms of the GPL3 license.
5  * You should have received a copy of the GPL3 license with
6  * this file. If not, please visit:
7  * https://www.gnu.org/licenses/gpl-3.0.en.html
8  *
9  * Authors:
10  * Thomas Friedrich <thomas.friedrich@uantwerpen.be>
11  * Chu-Ping Yu <chu-ping.yu@uantwerpen.be>
12  */
13 
14 #ifndef BOUNDED_THREAD_POOL_H
15 #define BOUNDED_THREAD_POOL_H
16 
17 #include <thread>
18 #include <queue>
19 #include <functional>
20 #include <mutex>
21 #include <condition_variable>
22 #include <atomic>
23 #include <iostream>
24 
26 {
27 private:
28  std::vector<std::thread> threads;
29  std::queue<std::function<void()>> tasks;
30  std::atomic<bool> b_running;
31  std::mutex mtx_queue;
32  std::condition_variable cnd_buffer_full;
33  std::condition_variable cnd_buffer_empty;
34 
35  void create_threads()
36  {
37  for (int i = 0; i < n_threads; i++)
38  {
39  threads.push_back(std::thread(&BoundedThreadPool::worker, this));
40  }
41  }
42 
43  void wait_for_task()
44  {
45  std::unique_lock<std::mutex> lock(mtx_queue);
46  cnd_buffer_empty.wait(lock, [this]
47  { return !tasks.empty() || !b_running; });
48  if (!tasks.empty())
49  {
50  std::function<void()> task = std::move(tasks.front());
51  tasks.pop();
52  cnd_buffer_full.notify_one();
53  task();
54  }
55  }
56 
57  void worker()
58  {
59  while (b_running)
60  {
61  try
62  {
63  wait_for_task();
64  }
65  catch (const std::exception &e)
66  {
67  std::cerr << e.what() << std::endl;
68  }
69  }
70  }
71 
72 public:
73  int n_threads;
74  int limit;
75 
76  template <typename T>
77  void push_task(const T &task)
78  {
79  std::unique_lock<std::mutex> lock(mtx_queue);
80  cnd_buffer_full.wait(lock, [this]
81  { return ((int)tasks.size() < limit); });
82  tasks.push(std::function<void()>(task));
83  cnd_buffer_empty.notify_one();
84  }
85 
86  template <typename T, typename... A>
87  void push_task(const T &task, const A &...args)
88  {
89  push_task([task, args...]
90  { task(args...); });
91  }
92 
94  {
95  std::unique_lock<std::mutex> lock(mtx_queue);
96  cnd_buffer_full.wait(lock, [this]
97  { return tasks.empty(); });
98  }
99 
101  {
102  for (int i = 0; i < n_threads; i++)
103  {
104  threads[i].join();
105  }
106  }
107 
108  void init(int n_threads, int limit)
109  {
110  int n_threads_max = std::thread::hardware_concurrency();
111  if (n_threads > n_threads_max || n_threads < 1)
112  {
113  this->n_threads = n_threads_max;
114  }
115  else
116  {
117  this->n_threads = n_threads;
118  }
119  this->limit = limit;
120  b_running = true;
121  create_threads();
122  }
123 
124  explicit BoundedThreadPool(int n_threads) : limit(8)
125  {
126  init(n_threads, limit);
127  }
128 
129  explicit BoundedThreadPool(int n_threads, int limit)
130  {
131  init(n_threads, limit);
132  }
133 
134  BoundedThreadPool() : b_running(false), n_threads(0), limit(0) {}
135 
137  {
139  b_running = false;
140  cnd_buffer_empty.notify_all();
141  join_threads();
142  }
143 };
144 
145 #endif
BoundedThreadPool(int n_threads, int limit)
void init(int n_threads, int limit)
void push_task(const T &task)
void push_task(const T &task, const A &...args)
BoundedThreadPool(int n_threads)