riCOM_cpp
This repository contains the C++ implementation of the riCOM (Real Time Centre Of Mass) algorithm for 4D Scanning electron microscopy.
Loading...
Searching...
No Matches
BoundedThreadPool.hpp
Go to the documentation of this file.
1/* Copyright (C) 2021 Thomas Friedrich, Chu-Ping Yu,
2 * University of Antwerp - All Rights Reserved.
3 * You may use, distribute and modify
4 * this code under the terms of the GPL3 license.
5 * You should have received a copy of the GPL3 license with
6 * this file. If not, please visit:
7 * https://www.gnu.org/licenses/gpl-3.0.en.html
8 *
9 * Authors:
10 * Thomas Friedrich <thomas.friedrich@uantwerpen.be>
11 * Chu-Ping Yu <chu-ping.yu@uantwerpen.be>
12 */
13
14#ifndef BOUNDED_THREAD_POOL_H
15#define BOUNDED_THREAD_POOL_H
16
17#include <thread>
18#include <queue>
19#include <functional>
20#include <mutex>
21#include <condition_variable>
22#include <atomic>
23#include <iostream>
24
26{
27private:
28 std::vector<std::thread> threads;
29 std::queue<std::function<void()>> tasks;
30 std::atomic<bool> b_running;
31 std::mutex mtx_queue;
32 std::condition_variable cnd_buffer_full;
33 std::condition_variable cnd_buffer_empty;
34
35 void create_threads()
36 {
37 for (int i = 0; i < n_threads; i++)
38 {
39 threads.push_back(std::thread(&BoundedThreadPool::worker, this));
40 }
41 }
42
43 void wait_for_task()
44 {
45 std::unique_lock<std::mutex> lock(mtx_queue);
46 cnd_buffer_empty.wait(lock, [this]
47 { return !tasks.empty() || !b_running; });
48 if (!tasks.empty())
49 {
50 std::function<void()> task = std::move(tasks.front());
51 tasks.pop();
52 cnd_buffer_full.notify_one();
53 task();
54 }
55 }
56
57 void worker()
58 {
59 while (b_running)
60 {
61 try
62 {
63 wait_for_task();
64 }
65 catch (const std::exception &e)
66 {
67 std::cerr << e.what() << std::endl;
68 }
69 }
70 }
71
72public:
74 int limit;
75
76 template <typename T>
77 void push_task(const T &task)
78 {
79 std::unique_lock<std::mutex> lock(mtx_queue);
80 cnd_buffer_full.wait(lock, [this]
81 { return ((int)tasks.size() < limit); });
82 tasks.push(std::function<void()>(task));
83 cnd_buffer_empty.notify_one();
84 }
85
86 template <typename T, typename... A>
87 void push_task(const T &task, const A &...args)
88 {
89 push_task([task, args...]
90 { task(args...); });
91 }
92
94 {
95 std::unique_lock<std::mutex> lock(mtx_queue);
96 cnd_buffer_full.wait(lock, [this]
97 { return tasks.empty(); });
98 }
99
101 {
102 for (int i = 0; i < n_threads; i++)
103 {
104 threads[i].join();
105 }
106 }
107
108 void init(int n_threads, int limit)
109 {
110 int n_threads_max = std::thread::hardware_concurrency();
111 if (n_threads > n_threads_max || n_threads < 1)
112 {
113 this->n_threads = n_threads_max;
114 }
115 else
116 {
117 this->n_threads = n_threads;
118 }
119 this->limit = limit;
120 b_running = true;
121 create_threads();
122 }
123
125 {
127 }
128
130 {
132 }
133
134 BoundedThreadPool() : b_running(false), n_threads(0), limit(0) {}
135
137 {
139 b_running = false;
140 cnd_buffer_empty.notify_all();
141 join_threads();
142 }
143};
144
145#endif
BoundedThreadPool(int n_threads, int limit)
void init(int n_threads, int limit)
void push_task(const T &task)
void push_task(const T &task, const A &...args)
BoundedThreadPool(int n_threads)