PicoScenes API Docs
 
Loading...
Searching...
No Matches
TaggedThreadPool.hxx
Go to the documentation of this file.
1//
2// Created by Zhiping Jiang on 9/12/22.
3//
4
5#ifndef PICOSCENES_PLATFORM_TAGGEDTHREADPOOL_HXX
6#define PICOSCENES_PLATFORM_TAGGEDTHREADPOOL_HXX
7
8#include <thread>
9#include <iostream>
10#include <any>
11#include <numeric>
12#include <vector>
13#include <deque>
14#include <functional>
15#include <condition_variable>
16#include <optional>
17#include <map>
18#include <shared_mutex>
19#include <algorithm>
20#include "Singleton.hxx"
21
22using namespace std::chrono_literals;
23
24template<unsigned ThreadCountCompileTime = 1>
26public:
27 explicit TaggedThreadPool(const int threadCount = ThreadCountCompileTime, const std::string& threadNamePrefix = std::string{}) : poolState(true), threadNamePrefix(threadNamePrefix) {
29 }
30
31 void addThread(const int numThreads2Add) {
32 for (auto i = 0; i < numThreads2Add; i++) {
33 threads.emplace_back(std::thread([this] {
34 {
35 auto lock = std::unique_lock(poolMutex);
36 threadSequenceNumberMap[std::this_thread::get_id()] = threadSequenceNumberSource;
37 if (threadSpecificTasksMap.contains(threadSequenceNumberSource) || threadSpecificMutexes.contains(threadSequenceNumberSource))
38 throw std::runtime_error("Duplicate sequence number for threads");
39 threadSpecificTasksMap[threadSequenceNumberSource] = std::deque<std::pair<std::function<void(const std::any&)>, std::any>>();
40 threadSpecificMutexes[threadSequenceNumberSource] = std::make_unique<std::mutex>();
41 perThreadJobCount[threadSequenceNumberSource] = 0;
42 perThreadTimeOccupation[threadSequenceNumberSource] = std::make_pair(0.0, 0.0);
43 ++threadSequenceNumberSource; // 放在最后,是确保thread number是从0开始
44 setThreadNamePrefix();
45 }
46 serviceLoopPerThread();
47 }));
48 }
49 }
50
54 template<typename Task, typename Params, typename = std::enable_if_t<std::is_invocable_v<Task, const std::any &> && std::is_same_v<std::decay_t<Params>, std::any>>>
55 void addJob(Task&& task, Params&& params, const std::optional<uint32_t>& threadSN = std::nullopt) {
56 if (threadSN && threadSpecificTasksMap.contains(*threadSN)) {
57 auto lock = std::lock_guard(*threadSpecificMutexes[*threadSN]);
58 threadSpecificTasksMap[*threadSN].emplace_back(std::forward<Task>(task), std::forward<Params>(params));
59 } else {
60 auto lock = std::lock_guard(sharedPoolMutex);
61 sharedTaskQueue.emplace_back(std::forward<Task>(task), std::forward<Params>(params));
62 }
63 taskWaitCV.notify_all(); // Must be notify_all to prevent thread hanging
64 }
65
66 void AddJob(const std::function<void()>& task) {
67 addJob([task](const std::any&) {
68 task();
69 }, std::any{}, std::nullopt);
70 }
71
72 [[nodiscard]] size_t threadCount() const {
73 return threads.size();
74 }
75
76 [[nodiscard]] size_t taskCount() const {
77 auto taskCount = sharedTaskQueue.size();
78 taskCount += std::accumulate(threadSpecificTasksMap.cbegin(), threadSpecificTasksMap.cend(), 0,
79 [](size_t sum, const auto& pair) {
80 return sum + pair.second.size();
81 });
82 return taskCount;
83 }
84
86 shutdown();
87 }
88
89private:
90 bool poolState{};
91 const std::string threadNamePrefix{"PS"};
92 std::atomic<uint32_t> threadSequenceNumberSource{0};
93
94 // Threads and Thread-Ids
95 std::vector<std::thread> threads;
96 std::unordered_map<std::thread::id, uint32_t> threadSequenceNumberMap;
97
98 // Shared and Thread-Specific queues
99 std::deque<std::pair<std::function<void(const std::any&)>, std::any>> sharedTaskQueue;
100 std::unordered_map<uint32_t, std::deque<std::pair<std::function<void(const std::any&)>, std::any>>> threadSpecificTasksMap;
101
102 // queue mutexes
103 std::mutex sharedPoolMutex;
104 std::unordered_map<uint32_t, std::unique_ptr<std::mutex>> threadSpecificMutexes;
105
106 // Statistics
107 std::unordered_map<uint32_t, int64_t> perThreadJobCount;
108 std::unordered_map<uint32_t, std::pair<double, double>> perThreadTimeOccupation;
109
110 std::shared_mutex poolMutex;
111 std::mutex taskWaitMutex;
112 std::condition_variable taskWaitCV;
113
114 void serviceLoopPerThread() {
115 const auto threadOnlineTime = std::chrono::system_clock::now();
116 while (poolState) {
117 if (const auto jobPack = getNextJob()) {
118 auto start = std::chrono::system_clock::now();
119 jobPack->first(jobPack->second);
120 auto end = std::chrono::system_clock::now();
121 const auto threadId = std::this_thread::get_id();
122 perThreadJobCount[threadSequenceNumberMap[threadId]]++;
123 perThreadTimeOccupation[threadSequenceNumberMap[threadId]].first += (end - start).count();
124 perThreadTimeOccupation[threadSequenceNumberMap[threadId]].second = (end - threadOnlineTime).count();
125 }
126 }
127 }
128
129 std::optional<std::pair<std::function<void(const std::any&)>, std::any>> getNextJob() {
130 const auto threadId = std::this_thread::get_id();
131
132 auto shouldWait = [this, &threadId]() {
133 if (!sharedTaskQueue.empty())
134 return true;
135
136 auto lock = std::shared_lock(poolMutex);
137 auto threadSpecificResult = threadSequenceNumberMap.contains(threadId) && !threadSpecificTasksMap[threadSequenceNumberMap[threadId]].empty();
138 return threadSpecificResult;
139 };
140
141 auto taskWaitLock = std::unique_lock(taskWaitMutex);
142 taskWaitCV.wait(taskWaitLock, shouldWait);
143
144 if (!sharedTaskQueue.empty()) {
145 auto lock = std::lock_guard(sharedPoolMutex);
146 auto jobPack = sharedTaskQueue.front();
147 sharedTaskQueue.pop_front();
148 return std::move(jobPack);
149 }
150
151 if (const auto it = threadSequenceNumberMap.find(threadId); it != threadSequenceNumberMap.end() && !threadSpecificTasksMap[it->second].empty()) {
152 auto taskQueueOpLock = std::lock_guard(*threadSpecificMutexes[it->second]);
153 auto jobPack = threadSpecificTasksMap[it->second].front();
154 threadSpecificTasksMap[it->second].pop_front();
155 return jobPack;
156 }
157
158 return std::nullopt;
159 }
160
161 void setThreadNamePrefix() {
162 if (threadNamePrefix.empty())
163 return;
164
165#if !defined(__APPLE__)
166 if (threadNamePrefix.length() > 15)
167 throw std::invalid_argument("Thread name prefix MUST be shorter than 15 characters.");
168#endif
169
170 const size_t threadOrder = threadSequenceNumberMap[std::this_thread::get_id()];
171 std::string threadName = threadNamePrefix + std::to_string(threadOrder);
172
173#if !defined(__APPLE__)
174 if (pthread_setname_np(pthread_self(), threadName.c_str()))
175#else
176 if (pthread_setname_np(threadName.c_str()))
177#endif
178 throw std::runtime_error("failed to set name for current thread...");
179 }
180
181 void shutdown() {
182 auto currentThreadId = std::this_thread::get_id();
183 if (std::any_of(threads.cbegin(), threads.cend(), [currentThreadId](const std::thread& t) { return t.get_id() == currentThreadId; })) {
184 throw std::runtime_error("Cannot shutdown thread pool from within ...");
185 }
186
187 auto lock = std::lock_guard(poolMutex);
188 poolState = false;
189 taskWaitCV.notify_all();
190 for (auto& thread: threads) {
191 if (thread.joinable()) {
192 thread.join();
193 }
194 }
195
196 threadSequenceNumberMap.clear();
197 threadSpecificTasksMap.clear();
198 sharedTaskQueue.clear();
199 threads.clear();
200 }
201};
202
204
205
206#endif //PICOSCENES_PLATFORM_TAGGEDTHREADPOOL_HXX
Singleton< TaggedThreadPool< 300 > > ThreadPoolSingleton
TaggedThreadPool(const int threadCount=ThreadCountCompileTime, const std::string &threadNamePrefix=std::string{})
void AddJob(const std::function< void()> &task)
size_t taskCount() const
void addJob(Task &&task, Params &&params, const std::optional< uint32_t > &threadSN=std::nullopt)
void addThread(const int numThreads2Add)
size_t threadCount() const