14 #ifndef BOUNDED_THREAD_POOL_H
15 #define BOUNDED_THREAD_POOL_H
21 #include <condition_variable>
28 std::vector<std::thread> threads;
29 std::queue<std::function<void()>> tasks;
30 std::atomic<bool> b_running;
31 std::mutex mtx_queue_full;
32 std::mutex mtx_queue_empty;
33 std::condition_variable cnd_buffer_full;
34 std::condition_variable cnd_buffer_empty;
40 threads.push_back(std::thread(&BoundedThreadPool::worker,
this));
44 inline void execute_task(std::unique_lock<std::mutex> *lock)
46 std::function<void()> task = std::move(tasks.front());
48 cnd_buffer_full.notify_one();
53 inline void wait_for_task()
55 std::unique_lock<std::mutex> lock(mtx_queue_empty);
56 cnd_buffer_empty.wait(lock, [
this]
57 {
return !tasks.empty() || !b_running; });
72 catch (
const std::exception &e)
74 std::cerr << e.what() << std::endl;
86 std::unique_lock<std::mutex> lock(mtx_queue_full);
87 cnd_buffer_full.wait(lock, [
this]
88 {
return ((
int)tasks.size() <
limit); });
89 tasks.push(std::function<
void()>(task));
90 cnd_buffer_empty.notify_one();
93 template <
typename T,
typename... A>
94 inline void push_task(
const T &task,
const A &...args)
102 std::unique_lock<std::mutex> lock(mtx_queue_full);
103 cnd_buffer_full.wait(lock, [
this]
104 {
return tasks.empty(); });
117 int n_threads_max = std::thread::hardware_concurrency();
120 this->n_threads = n_threads_max;
147 cnd_buffer_empty.notify_all();
BoundedThreadPool(int n_threads, int limit)
void init(int n_threads, int limit)
void push_task(const T &task)
void push_task(const T &task, const A &...args)
BoundedThreadPool(int n_threads)
void wait_for_completion()