Maestro 0.1.0
Unified interface for quantum circuit simulation
Loading...
Searching...
No Matches
ThreadsPool.h
Go to the documentation of this file.
11
12#pragma once
13
14#ifndef __THREADS_POOL_H_
15#define __THREADS_POOL_H_
16
17#include <condition_variable>
18#include <queue>
19#include <vector>
20
21#include <fstream>
22
23#include "WorkerThread.h"
24
25namespace Utils {
26
38template <class Job> class ThreadsPool {
39 using JobWorkerThread = WorkerThread<ThreadsPool<Job>, Job>;
40 friend class WorkerThread<ThreadsPool<Job>, Job>;
41
42public:
50 explicit ThreadsPool(int nrThreads = 0) {
51 if (nrThreads <= 0)
52 nrThreads = 1;
53
54 for (int i = 0; i < nrThreads; ++i)
55 Threads.emplace_back(std::make_unique<JobWorkerThread>(this));
56 }
57
65
72 void Stop() {
73 {
74 std::lock_guard lock(Mutex);
75
76 for (auto &worker : Threads)
77 worker->SetStopUnlocked();
78 }
79
80 NotifyAll();
81
82 for (auto &worker : Threads)
83 worker->Join();
84
85 Threads.clear();
86 }
87
93 void Start() {
94 for (auto &worker : Threads)
95 worker->Start();
96 }
97
105 void AddRunJob(const std::shared_ptr<Job> &job) {
106 {
107 std::lock_guard lock(Mutex);
108 JobsQueue.push(job);
109 }
110
111 NotifyOne();
112 }
113
121 void AddRunJob(std::shared_ptr<Job> &&job) {
122 {
123 std::lock_guard lock(Mutex);
124 JobsQueue.push(std::move(job));
125 }
126
127 NotifyOne();
128 }
129
139 do {
140 std::unique_lock lock(FinishMutex);
141
142 if (FinishCount >= FinishLimit)
143 return;
144
145 ConditionFinish.wait(lock, [this] { return FinishCount >= FinishLimit; });
146
147 if (FinishCount >= FinishLimit)
148 return;
149 } while (true);
150 }
151
162 void Resize(size_t nrThreads) {
163 if (nrThreads <= 0)
164 nrThreads = 1;
165
166 size_t oldSize = Threads.size();
167
168 if (oldSize == nrThreads)
169 return;
170
171 if (oldSize < nrThreads) {
172 for (size_t i = oldSize; i < nrThreads; ++i) {
173 Threads.emplace_back(std::make_unique<JobWorkerThread>(this));
174 Threads.back()->Start();
175 }
176 } else {
177 {
178 std::lock_guard lock(Mutex);
179 for (size_t i = oldSize; i < nrThreads; ++i)
180 Threads[i]->SetStopUnlocked();
181 }
182
183 NotifyAll();
184
185 for (size_t i = oldSize; i < nrThreads; ++i)
186 Threads[i]->Join();
187 Threads.resize(nrThreads);
188 }
189 }
190
201 void SetFinishLimit(size_t limit) {
202 std::unique_lock lock(FinishMutex);
203 FinishLimit = limit;
204 FinishCount = 0;
205 }
206
207private:
213 inline void NotifyOne() { Condition.notify_one(); }
214
221 inline void NotifyAll() { Condition.notify_all(); }
222
229 inline bool HasWork() const { return !JobsQueue.empty(); }
230
238 void NotifyFinish() { ConditionFinish.notify_all(); }
239
240 std::mutex Mutex;
241 std::condition_variable
242 Condition;
243
244 std::mutex FinishMutex;
245 std::condition_variable ConditionFinish;
247
248 size_t FinishCount = 0;
249 size_t FinishLimit =
250 std::numeric_limits<size_t>::max();
252
253 std::queue<std::shared_ptr<Job>>
254 JobsQueue;
255
256 std::vector<std::unique_ptr<JobWorkerThread>>
257 Threads;
258};
259
260} // namespace Utils
261
262#endif // __THREADS_POOL_H_
void Start()
Start all the threads in the threads pool.
Definition ThreadsPool.h:93
void AddRunJob(const std::shared_ptr< Job > &job)
Add a job to be executed by the threads pool.
void WaitForFinish()
Wait for all jobs to finish.
void SetFinishLimit(size_t limit)
Set the finish limit.
void AddRunJob(std::shared_ptr< Job > &&job)
Add a job to be executed by the threads pool.
void Resize(size_t nrThreads)
Resize the threads pool.
void Stop()
Stop all the threads in the threads pool.
Definition ThreadsPool.h:72
~ThreadsPool()
Destructor.
Definition ThreadsPool.h:64
ThreadsPool(int nrThreads=0)
Construct a new Thread pool object.
Definition ThreadsPool.h:50
WorkerThread class for a thread in a threads pool.
Definition Alias.h:20