27 explicit TaggedThreadPool(
const int threadCount = ThreadCountCompileTime,
const std::string& threadNamePrefix = std::string{}) : poolState(true), threadNamePrefix(threadNamePrefix) {
32 for (
auto i = 0; i < numThreads2Add; i++) {
33 threads.emplace_back(std::thread([
this] {
35 auto lock = std::unique_lock(poolMutex);
36 threadSequenceNumberMap[std::this_thread::get_id()] = threadSequenceNumberSource;
37 if (threadSpecificTasksMap.contains(threadSequenceNumberSource) || threadSpecificMutexes.contains(threadSequenceNumberSource))
38 throw std::runtime_error(
"Duplicate sequence number for threads");
39 threadSpecificTasksMap[threadSequenceNumberSource] = std::deque<std::pair<std::function<void(
const std::any&)>, std::any>>();
40 threadSpecificMutexes[threadSequenceNumberSource] = std::make_unique<std::mutex>();
41 perThreadJobCount[threadSequenceNumberSource] = 0;
42 perThreadTimeOccupation[threadSequenceNumberSource] = std::make_pair(0.0, 0.0);
43 ++threadSequenceNumberSource;
44 setThreadNamePrefix();
46 serviceLoopPerThread();
54 template<
typename Task,
typename Params,
typename = std::enable_if_t<std::is_invocable_v<Task, const std::any &> && std::is_same_v<std::decay_t<Params>, std::any>>>
55 void addJob(Task&& task, Params&& params,
const std::optional<uint32_t>& threadSN = std::nullopt) {
56 if (threadSN && threadSpecificTasksMap.contains(*threadSN)) {
57 auto lock = std::lock_guard(*threadSpecificMutexes[*threadSN]);
58 threadSpecificTasksMap[*threadSN].emplace_back(std::forward<Task>(task), std::forward<Params>(params));
60 auto lock = std::lock_guard(sharedPoolMutex);
61 sharedTaskQueue.emplace_back(std::forward<Task>(task), std::forward<Params>(params));
63 taskWaitCV.notify_all();
66 void AddJob(
const std::function<
void()>& task) {
67 addJob([task](
const std::any&) {
69 }, std::any{}, std::nullopt);
73 return threads.size();
78 taskCount += std::accumulate(threadSpecificTasksMap.cbegin(), threadSpecificTasksMap.cend(), 0,
79 [](
size_t sum,
const auto& pair) {
80 return sum + pair.second.size();
91 const std::string threadNamePrefix{
"PS"};
92 std::atomic<uint32_t> threadSequenceNumberSource{0};
95 std::vector<std::thread> threads;
96 std::unordered_map<std::thread::id, uint32_t> threadSequenceNumberMap;
99 std::deque<std::pair<std::function<void(
const std::any&)>, std::any>> sharedTaskQueue;
100 std::unordered_map<uint32_t, std::deque<std::pair<std::function<void(
const std::any&)>, std::any>>> threadSpecificTasksMap;
103 std::mutex sharedPoolMutex;
104 std::unordered_map<uint32_t, std::unique_ptr<std::mutex>> threadSpecificMutexes;
107 std::unordered_map<uint32_t, int64_t> perThreadJobCount;
108 std::unordered_map<uint32_t, std::pair<double, double>> perThreadTimeOccupation;
110 std::shared_mutex poolMutex;
111 std::mutex taskWaitMutex;
112 std::condition_variable taskWaitCV;
114 void serviceLoopPerThread() {
115 const auto threadOnlineTime = std::chrono::system_clock::now();
117 if (
const auto jobPack = getNextJob()) {
118 auto start = std::chrono::system_clock::now();
119 jobPack->first(jobPack->second);
120 auto end = std::chrono::system_clock::now();
121 const auto threadId = std::this_thread::get_id();
122 perThreadJobCount[threadSequenceNumberMap[threadId]]++;
123 perThreadTimeOccupation[threadSequenceNumberMap[threadId]].first += (end - start).count();
124 perThreadTimeOccupation[threadSequenceNumberMap[threadId]].second = (end - threadOnlineTime).count();
129 std::optional<std::pair<std::function<void(
const std::any&)>, std::any>> getNextJob() {
130 const auto threadId = std::this_thread::get_id();
132 auto shouldWait = [
this, &threadId]() {
133 if (!sharedTaskQueue.empty())
136 auto lock = std::shared_lock(poolMutex);
137 auto threadSpecificResult = threadSequenceNumberMap.contains(threadId) && !threadSpecificTasksMap[threadSequenceNumberMap[threadId]].empty();
138 return threadSpecificResult;
141 auto taskWaitLock = std::unique_lock(taskWaitMutex);
142 taskWaitCV.wait(taskWaitLock, shouldWait);
144 if (!sharedTaskQueue.empty()) {
145 auto lock = std::lock_guard(sharedPoolMutex);
146 auto jobPack = sharedTaskQueue.front();
147 sharedTaskQueue.pop_front();
148 return std::move(jobPack);
151 if (
const auto it = threadSequenceNumberMap.find(threadId); it != threadSequenceNumberMap.end() && !threadSpecificTasksMap[it->second].empty()) {
152 auto taskQueueOpLock = std::lock_guard(*threadSpecificMutexes[it->second]);
153 auto jobPack = threadSpecificTasksMap[it->second].front();
154 threadSpecificTasksMap[it->second].pop_front();
161 void setThreadNamePrefix() {
162 if (threadNamePrefix.empty())
165#if !defined(__APPLE__)
166 if (threadNamePrefix.length() > 15)
167 throw std::invalid_argument(
"Thread name prefix MUST be shorter than 15 characters.");
170 const size_t threadOrder = threadSequenceNumberMap[std::this_thread::get_id()];
171 std::string threadName = threadNamePrefix + std::to_string(threadOrder);
173#if !defined(__APPLE__)
174 if (pthread_setname_np(pthread_self(), threadName.c_str()))
176 if (pthread_setname_np(threadName.c_str()))
178 throw std::runtime_error(
"failed to set name for current thread...");
182 auto currentThreadId = std::this_thread::get_id();
183 if (std::any_of(threads.cbegin(), threads.cend(), [currentThreadId](
const std::thread& t) { return t.get_id() == currentThreadId; })) {
184 throw std::runtime_error(
"Cannot shutdown thread pool from within ...");
187 auto lock = std::lock_guard(poolMutex);
189 taskWaitCV.notify_all();
190 for (
auto& thread: threads) {
191 if (thread.joinable()) {
196 threadSequenceNumberMap.clear();
197 threadSpecificTasksMap.clear();
198 sharedTaskQueue.clear();