Maestro 0.1.0
Unified interface for quantum circuit simulation
Loading...
Searching...
No Matches
ThreadsPool.h
Go to the documentation of this file.
12#pragma once
13
14#ifndef __THREADS_POOL_H_
15#define __THREADS_POOL_H_
16
17#include <condition_variable>
18#include <queue>
19#include <vector>
20
21#include <fstream>
22
23#include "WorkerThread.h"
24
25namespace Utils {
26
38template <class Job>
41 friend class WorkerThread<ThreadsPool<Job>, Job>;
42
43 public:
51 explicit ThreadsPool(int nrThreads = 0) {
52 if (nrThreads <= 0) nrThreads = 1;
53
54 for (int i = 0; i < nrThreads; ++i)
55 Threads.emplace_back(std::make_unique<JobWorkerThread>(this));
56 }
57
65
72 void Stop() {
73 {
74 std::lock_guard lock(Mutex);
75
76 for (auto &worker : Threads) worker->SetStopUnlocked();
77 }
78
79 NotifyAll();
80
81 for (auto &worker : Threads) worker->Join();
82
83 Threads.clear();
84 }
85
91 void Start() {
92 for (auto &worker : Threads) worker->Start();
93 }
94
102 void AddRunJob(const std::shared_ptr<Job> &job) {
103 {
104 std::lock_guard lock(Mutex);
105 JobsQueue.push(job);
106 }
107
108 NotifyOne();
109 }
110
118 void AddRunJob(std::shared_ptr<Job> &&job) {
119 {
120 std::lock_guard lock(Mutex);
121 JobsQueue.push(std::move(job));
122 }
123
124 NotifyOne();
125 }
126
136 do {
137 std::unique_lock lock(FinishMutex);
138
139 if (FinishCount >= FinishLimit) return;
140
141 ConditionFinish.wait(lock, [this] { return FinishCount >= FinishLimit; });
142
143 if (FinishCount >= FinishLimit) return;
144 } while (true);
145 }
146
157 void Resize(size_t nrThreads) {
158 if (nrThreads <= 0) nrThreads = 1;
159
160 size_t oldSize = Threads.size();
161
162 if (oldSize == nrThreads) return;
163
164 if (oldSize < nrThreads) {
165 for (size_t i = oldSize; i < nrThreads; ++i) {
166 Threads.emplace_back(std::make_unique<JobWorkerThread>(this));
167 Threads.back()->Start();
168 }
169 } else {
170 {
171 std::lock_guard lock(Mutex);
172 for (size_t i = oldSize; i < nrThreads; ++i)
173 Threads[i]->SetStopUnlocked();
174 }
175
176 NotifyAll();
177
178 for (size_t i = oldSize; i < nrThreads; ++i) Threads[i]->Join();
179 Threads.resize(nrThreads);
180 }
181 }
182
193 void SetFinishLimit(size_t limit) {
194 std::unique_lock lock(FinishMutex);
195 FinishLimit = limit;
196 FinishCount = 0;
197 }
198
199 private:
205 inline void NotifyOne() { Condition.notify_one(); }
206
213 inline void NotifyAll() { Condition.notify_all(); }
214
221 inline bool HasWork() const { return !JobsQueue.empty(); }
222
230 void NotifyFinish() { ConditionFinish.notify_all(); }
231
232 std::mutex Mutex;
233 std::condition_variable
234 Condition;
236 std::mutex FinishMutex;
237 std::condition_variable ConditionFinish;
240 size_t FinishCount = 0;
241 size_t FinishLimit =
242 std::numeric_limits<size_t>::max();
245 std::queue<std::shared_ptr<Job>>
246 JobsQueue;
248 std::vector<std::unique_ptr<JobWorkerThread>>
249 Threads;
250};
251
252} // namespace Utils
253
254#endif // __THREADS_POOL_H_
ThreadsPool class for holding and controlling a pool of threads.
Definition ThreadsPool.h:39
void Start()
Start all the threads in the threads pool.
Definition ThreadsPool.h:91
void AddRunJob(const std::shared_ptr< Job > &job)
Add a job to be executed by the threads pool.
void WaitForFinish()
Wait for all jobs to finish.
void SetFinishLimit(size_t limit)
Set the finish limit.
void AddRunJob(std::shared_ptr< Job > &&job)
Add a job to be executed by the threads pool.
void Resize(size_t nrThreads)
Resize the threads pool.
void Stop()
Stop all the threads in the threads pool.
Definition ThreadsPool.h:72
~ThreadsPool()
Destructor.
Definition ThreadsPool.h:64
ThreadsPool(int nrThreads=0)
Construct a new Thread pool object.
Definition ThreadsPool.h:51
WorkerThread class for a thread in a threads pool.
Definition Alias.h:20