PicoScenes API Docs
 
Loading...
Searching...
No Matches
TaggedThreadPool.hxx
Go to the documentation of this file.
1//
2// Created by Zhiping Jiang on 9/12/22.
3//
4
5#ifndef PICOSCENES_PLATFORM_TAGGEDTHREADPOOL_HXX
6#define PICOSCENES_PLATFORM_TAGGEDTHREADPOOL_HXX
7
8#include <thread>
9#include <iostream>
10#include <any>
11#include <numeric>
12#include <vector>
13#include <deque>
14#include <functional>
15#include <condition_variable>
16#include <optional>
17#include <map>
18#include <shared_mutex>
19#include <algorithm>
20#include <chrono>
21#include "Singleton.hxx"
22#ifdef _WIN32
23#include "pthread.h"
24#endif
25
26using namespace std::chrono_literals;
27
36template<unsigned ThreadCountCompileTime = 1>
38public:
45 explicit TaggedThreadPool(const int threadCount = ThreadCountCompileTime, const std::string& threadNamePrefix = std::string{}) : poolState(true), threadNamePrefix(threadNamePrefix) {
47 }
48
54 void addThread(const int numThreads2Add) {
55 for (auto i = 0; i < numThreads2Add; i++) {
56 threads.emplace_back(std::thread([this] {
57 {
58 auto lock = std::unique_lock(poolMutex);
59 threadSequenceNumberMap[std::this_thread::get_id()] = threadSequenceNumberSource;
60 if (threadSpecificTasksMap.contains(threadSequenceNumberSource) || threadSpecificMutexes.contains(threadSequenceNumberSource))
61 throw std::runtime_error("Duplicate sequence number for threads");
62 threadSpecificTasksMap[threadSequenceNumberSource] = std::deque<std::pair<std::function<void(const std::any&)>, std::any>>();
63 threadSpecificMutexes[threadSequenceNumberSource] = std::make_unique<std::mutex>();
64 perThreadJobCount[threadSequenceNumberSource] = 0;
65 perThreadTimeOccupation[threadSequenceNumberSource] = std::make_pair(0.0, 0.0);
66 ++threadSequenceNumberSource; // 放在最后,是确保thread number是从0开始
67 setThreadNamePrefix();
68 }
69 serviceLoopPerThread();
70 }));
71 }
72 }
73
83 template<typename Task, typename Params, typename = std::enable_if_t<std::is_invocable_v<Task, const std::any &> && std::is_same_v<std::decay_t<Params>, std::any>>>
84 void addJob(Task&& task, Params&& params, const std::optional<uint32_t>& threadSN = std::nullopt) {
85 if (threadSN && threadSpecificTasksMap.contains(*threadSN)) {
86 auto lock = std::lock_guard(*threadSpecificMutexes[*threadSN]);
87 threadSpecificTasksMap[*threadSN].emplace_back(std::forward<Task>(task), std::forward<Params>(params));
88 } else {
89 auto lock = std::lock_guard(sharedPoolMutex);
90 sharedTaskQueue.emplace_back(std::forward<Task>(task), std::forward<Params>(params));
91 }
92 taskWaitCV.notify_all(); // Must be notify_all to prevent thread hanging
93 }
94
100 void AddJob(const std::function<void()>& task) {
101 addJob([task](const std::any&) {
102 task();
103 }, std::any{}, std::nullopt);
104 }
105
111 [[nodiscard]] size_t threadCount() const {
112 return threads.size();
113 }
114
120 [[nodiscard]] size_t taskCount() const {
121 auto taskCount = sharedTaskQueue.size();
122 taskCount += std::accumulate(threadSpecificTasksMap.cbegin(), threadSpecificTasksMap.cend(), 0,
123 [](size_t sum, const auto& pair) {
124 return sum + pair.second.size();
125 });
126 return taskCount;
127 }
128
133 shutdown();
134 }
135
136private:
137 bool poolState{};
138 const std::string threadNamePrefix{"PS"};
139 std::atomic<uint32_t> threadSequenceNumberSource{0};
140
141 // Threads and Thread-Ids
142 std::vector<std::thread> threads;
143 std::unordered_map<std::thread::id, uint32_t> threadSequenceNumberMap;
144
145 // Shared and Thread-Specific queues
146 std::deque<std::pair<std::function<void(const std::any&)>, std::any>> sharedTaskQueue;
147 std::unordered_map<uint32_t, std::deque<std::pair<std::function<void(const std::any&)>, std::any>>> threadSpecificTasksMap;
148
149 // queue mutexes
150 std::mutex sharedPoolMutex;
151 std::unordered_map<uint32_t, std::unique_ptr<std::mutex>> threadSpecificMutexes;
152
153 // Statistics
154 std::unordered_map<uint32_t, int64_t> perThreadJobCount;
155 std::unordered_map<uint32_t, std::pair<double, double>> perThreadTimeOccupation;
156
157 std::shared_mutex poolMutex;
158 std::mutex taskWaitMutex;
159 std::condition_variable taskWaitCV;
160
166 void serviceLoopPerThread() {
167 const auto threadOnlineTime = std::chrono::system_clock::now();
168 while (poolState) {
169 if (const auto jobPack = getNextJob()) {
170 auto start = std::chrono::system_clock::now();
171 jobPack->first(jobPack->second);
172 auto end = std::chrono::system_clock::now();
173 const auto threadId = std::this_thread::get_id();
174 perThreadJobCount[threadSequenceNumberMap[threadId]]++;
175 perThreadTimeOccupation[threadSequenceNumberMap[threadId]].first += (end - start).count();
176 perThreadTimeOccupation[threadSequenceNumberMap[threadId]].second = (end - threadOnlineTime).count();
177 }
178 }
179 }
180
186 std::optional<std::pair<std::function<void(const std::any&)>, std::any>> getNextJob() {
187 const auto threadId = std::this_thread::get_id();
188
189 auto shouldWait = [this, &threadId]() {
190 if (!sharedTaskQueue.empty())
191 return true;
192
193 auto lock = std::shared_lock(poolMutex);
194 auto threadSpecificResult = threadSequenceNumberMap.contains(threadId) && !threadSpecificTasksMap[threadSequenceNumberMap[threadId]].empty();
195 return threadSpecificResult;
196 };
197
198 auto taskWaitLock = std::unique_lock(taskWaitMutex);
199 taskWaitCV.wait(taskWaitLock, shouldWait);
200
201 if (!sharedTaskQueue.empty()) {
202 auto lock = std::lock_guard(sharedPoolMutex);
203 auto jobPack = sharedTaskQueue.front();
204 sharedTaskQueue.pop_front();
205 return std::move(jobPack);
206 }
207
208 if (const auto it = threadSequenceNumberMap.find(threadId); it != threadSequenceNumberMap.end() && !threadSpecificTasksMap[it->second].empty()) {
209 auto taskQueueOpLock = std::lock_guard(*threadSpecificMutexes[it->second]);
210 auto jobPack = threadSpecificTasksMap[it->second].front();
211 threadSpecificTasksMap[it->second].pop_front();
212 return jobPack;
213 }
214
215 return std::nullopt;
216 }
217
223 void setThreadNamePrefix() {
224 if (threadNamePrefix.empty())
225 return;
226
227#if !defined(__APPLE__)
228 if (threadNamePrefix.length() > 15)
229 throw std::invalid_argument("Thread name prefix MUST be shorter than 15 characters.");
230#endif
231
232 const size_t threadOrder = threadSequenceNumberMap[std::this_thread::get_id()];
233 std::string threadName = threadNamePrefix + std::to_string(threadOrder);
234
235#if !defined(__APPLE__)
236 if (pthread_setname_np(pthread_self(), threadName.c_str()))
237#else
238 if (pthread_setname_np(threadName.c_str()))
239#endif
240 throw std::runtime_error("failed to set name for current thread...");
241 }
242
248 void shutdown() {
249 auto currentThreadId = std::this_thread::get_id();
250 if (std::any_of(threads.cbegin(), threads.cend(), [currentThreadId](const std::thread& t) { return t.get_id() == currentThreadId; })) {
251 throw std::runtime_error("Cannot shutdown thread pool from within ...");
252 }
253
254 auto lock = std::lock_guard(poolMutex);
255 poolState = false;
256 taskWaitCV.notify_all();
257 for (auto& thread: threads) {
258 if (thread.joinable()) {
259 thread.join();
260 }
261 }
262
263 threadSequenceNumberMap.clear();
264 threadSpecificTasksMap.clear();
265 sharedTaskQueue.clear();
266 threads.clear();
267 }
268};
269
274
275#endif //PICOSCENES_PLATFORM_TAGGEDTHREADPOOL_HXX
Singleton< TaggedThreadPool< 300 > > ThreadPoolSingleton
Singleton instance of a TaggedThreadPool with 300 threads.
A template class for creating singleton instances.
Definition Singleton.hxx:23
A thread pool with tagged threads for task management.
TaggedThreadPool(const int threadCount=ThreadCountCompileTime, const std::string &threadNamePrefix=std::string{})
Constructs a TaggedThreadPool with a specified number of threads.
void AddJob(const std::function< void()> &task)
Adds a simple job to the thread pool.
size_t taskCount() const
Gets the total number of tasks in the pool.
~TaggedThreadPool()
Destroys the thread pool and shuts down all threads.
void addJob(Task &&task, Params &&params, const std::optional< uint32_t > &threadSN=std::nullopt)
Adds a job to the thread pool. And threadSN is an integer starting from 0.
void addThread(const int numThreads2Add)
Adds a specified number of threads to the pool.
size_t threadCount() const
Gets the number of threads in the pool.