45 explicit TaggedThreadPool(
const int threadCount = ThreadCountCompileTime,
const std::string& threadNamePrefix = std::string{}) : poolState(true), threadNamePrefix(threadNamePrefix) {
55 for (
auto i = 0; i < numThreads2Add; i++) {
56 threads.emplace_back(std::thread([
this] {
58 auto lock = std::unique_lock(poolMutex);
59 threadSequenceNumberMap[std::this_thread::get_id()] = threadSequenceNumberSource;
60 if (threadSpecificTasksMap.contains(threadSequenceNumberSource) || threadSpecificMutexes.contains(threadSequenceNumberSource))
61 throw std::runtime_error(
"Duplicate sequence number for threads");
62 threadSpecificTasksMap[threadSequenceNumberSource] = std::deque<std::pair<std::function<void(
const std::any&)>, std::any>>();
63 threadSpecificMutexes[threadSequenceNumberSource] = std::make_unique<std::mutex>();
64 perThreadJobCount[threadSequenceNumberSource] = 0;
65 perThreadTimeOccupation[threadSequenceNumberSource] = std::make_pair(0.0, 0.0);
66 ++threadSequenceNumberSource;
67 setThreadNamePrefix();
69 serviceLoopPerThread();
83 template<
typename Task,
typename Params,
typename = std::enable_if_t<std::is_invocable_v<Task, const std::any &> && std::is_same_v<std::decay_t<Params>, std::any>>>
84 void addJob(Task&& task, Params&& params,
const std::optional<uint32_t>& threadSN = std::nullopt) {
85 if (threadSN && threadSpecificTasksMap.contains(*threadSN)) {
86 auto lock = std::lock_guard(*threadSpecificMutexes[*threadSN]);
87 threadSpecificTasksMap[*threadSN].emplace_back(std::forward<Task>(task), std::forward<Params>(params));
89 auto lock = std::lock_guard(sharedPoolMutex);
90 sharedTaskQueue.emplace_back(std::forward<Task>(task), std::forward<Params>(params));
92 taskWaitCV.notify_all();
100 void AddJob(
const std::function<
void()>& task) {
101 addJob([task](
const std::any&) {
103 }, std::any{}, std::nullopt);
112 return threads.size();
122 taskCount += std::accumulate(threadSpecificTasksMap.cbegin(), threadSpecificTasksMap.cend(), 0,
123 [](
size_t sum,
const auto& pair) {
124 return sum + pair.second.size();
138 const std::string threadNamePrefix{
"PS"};
139 std::atomic<uint32_t> threadSequenceNumberSource{0};
142 std::vector<std::thread> threads;
143 std::unordered_map<std::thread::id, uint32_t> threadSequenceNumberMap;
146 std::deque<std::pair<std::function<void(
const std::any&)>, std::any>> sharedTaskQueue;
147 std::unordered_map<uint32_t, std::deque<std::pair<std::function<void(
const std::any&)>, std::any>>> threadSpecificTasksMap;
150 std::mutex sharedPoolMutex;
151 std::unordered_map<uint32_t, std::unique_ptr<std::mutex>> threadSpecificMutexes;
154 std::unordered_map<uint32_t, int64_t> perThreadJobCount;
155 std::unordered_map<uint32_t, std::pair<double, double>> perThreadTimeOccupation;
157 std::shared_mutex poolMutex;
158 std::mutex taskWaitMutex;
159 std::condition_variable taskWaitCV;
166 void serviceLoopPerThread() {
167 const auto threadOnlineTime = std::chrono::system_clock::now();
169 if (
const auto jobPack = getNextJob()) {
170 auto start = std::chrono::system_clock::now();
171 jobPack->first(jobPack->second);
172 auto end = std::chrono::system_clock::now();
173 const auto threadId = std::this_thread::get_id();
174 perThreadJobCount[threadSequenceNumberMap[threadId]]++;
175 perThreadTimeOccupation[threadSequenceNumberMap[threadId]].first += (end - start).count();
176 perThreadTimeOccupation[threadSequenceNumberMap[threadId]].second = (end - threadOnlineTime).count();
186 std::optional<std::pair<std::function<void(
const std::any&)>, std::any>> getNextJob() {
187 const auto threadId = std::this_thread::get_id();
189 auto shouldWait = [
this, &threadId]() {
190 if (!sharedTaskQueue.empty())
193 auto lock = std::shared_lock(poolMutex);
194 auto threadSpecificResult = threadSequenceNumberMap.contains(threadId) && !threadSpecificTasksMap[threadSequenceNumberMap[threadId]].empty();
195 return threadSpecificResult;
198 auto taskWaitLock = std::unique_lock(taskWaitMutex);
199 taskWaitCV.wait(taskWaitLock, shouldWait);
201 if (!sharedTaskQueue.empty()) {
202 auto lock = std::lock_guard(sharedPoolMutex);
203 auto jobPack = sharedTaskQueue.front();
204 sharedTaskQueue.pop_front();
205 return std::move(jobPack);
208 if (
const auto it = threadSequenceNumberMap.find(threadId); it != threadSequenceNumberMap.end() && !threadSpecificTasksMap[it->second].empty()) {
209 auto taskQueueOpLock = std::lock_guard(*threadSpecificMutexes[it->second]);
210 auto jobPack = threadSpecificTasksMap[it->second].front();
211 threadSpecificTasksMap[it->second].pop_front();
223 void setThreadNamePrefix() {
224 if (threadNamePrefix.empty())
227#if !defined(__APPLE__)
228 if (threadNamePrefix.length() > 15)
229 throw std::invalid_argument(
"Thread name prefix MUST be shorter than 15 characters.");
232 const size_t threadOrder = threadSequenceNumberMap[std::this_thread::get_id()];
233 std::string threadName = threadNamePrefix + std::to_string(threadOrder);
235#if !defined(__APPLE__)
236 if (pthread_setname_np(pthread_self(), threadName.c_str()))
238 if (pthread_setname_np(threadName.c_str()))
240 throw std::runtime_error(
"failed to set name for current thread...");
249 auto currentThreadId = std::this_thread::get_id();
250 if (std::any_of(threads.cbegin(), threads.cend(), [currentThreadId](
const std::thread& t) { return t.get_id() == currentThreadId; })) {
251 throw std::runtime_error(
"Cannot shutdown thread pool from within ...");
254 auto lock = std::lock_guard(poolMutex);
256 taskWaitCV.notify_all();
257 for (
auto& thread: threads) {
258 if (thread.joinable()) {
263 threadSequenceNumberMap.clear();
264 threadSpecificTasksMap.clear();
265 sharedTaskQueue.clear();