PicoScenes API Docs
 
Loading...
Searching...
No Matches
AsyncPipeline.hxx
Go to the documentation of this file.
1//
2// Created by Zhiping Jiang on 22-7-21.
3//
4
5#ifndef PICOSCENES_PLATFORM_ASYNCPIPELINE_HXX
6#define PICOSCENES_PLATFORM_ASYNCPIPELINE_HXX
7
8#include "SystemTools.hxx"
10
11using namespace std::chrono_literals;
12
23template<typename ObjectType>
25public:
34 pipelineState = true;
35 pipelinePaused = false;
36 std::this_thread::sleep_for(1ms);
38 serviceLoop();
39 });
40 return *this;
41 }
42
49 void stopService() {
50 if (!pipelineState)
51 return;
52
53 pipelineState = false;
54 pipelinePaused = false;
55 bufferMutex.lock();
56 handleMutex.lock();
57 handlers.clear();
58 buffer.clear();
59 objectTrackingIdQueue.clear();
60 bufferMutex.unlock();
61 handleMutex.unlock();
62 std::this_thread::sleep_for(200ms);
63 }
64
72 template<typename T>
73 uint64_t send(T&& object) {
74 std::lock_guard lock(bufferMutex);
75 buffer.emplace_back(std::forward<T>(object));
76 auto trackingId = currentTrackingId;
77 objectTrackingIdQueue.emplace_back(currentTrackingId++);
78 bufferCV.notify_all();
79 return trackingId;
80 }
81
89 template<typename T>
90 uint64_t sendSync(T&& object) {
91 auto trackingId = send(std::forward<T>(object));
92 std::unique_lock syncLock(sendSyncMutex);
93 while (pipelineState) {
94 syncSendCV.wait_for(syncLock, 1ms, [&]() {
95 return std::find(objectTrackingIdQueue.begin(), objectTrackingIdQueue.end(), trackingId) == objectTrackingIdQueue.end();
96 });
97
98 if (std::find(objectTrackingIdQueue.begin(), objectTrackingIdQueue.end(), trackingId) == objectTrackingIdQueue.end())
99 break;
100 }
101 return trackingId;
102 }
103
109 int pause() {
110 pipelinePaused = true;
111 pauseCV.notify_all();
112 return 0;
113 }
114
120 int resume() {
121 pipelinePaused = false;
122 pauseCV.notify_all();
123 return 0;
124 }
125
133 AsyncPipeline<ObjectType> &registerAsyncHandler(std::string handlerName, std::function<void(const ObjectType &object)> handler) {
134 auto guardHandle = std::lock_guard(handleMutex);
135 handlers[handlerName] = handler;
136
137 return *this;
138 }
139
148 std::optional<ObjectType> selectDataByCondition(const std::function<bool(const ObjectType &)> &condition, const std::chrono::milliseconds &maxWait_ms, std::optional<std::string> conditionName) {
149 thread_local static uint64_t conditionCount = 0;
150 std::optional<ObjectType> returnValue;
151 std::unique_lock syncLock(handleSyncMutex);
152 if (!conditionName)
153 conditionName = "Conditional" + std::to_string(conditionCount);
154
155 registerAsyncHandler(*conditionName, [=, this, &returnValue](const ObjectType &itemToBeProcessed) -> bool {
156 if (condition && condition(itemToBeProcessed)) {
157 returnValue = itemToBeProcessed;
158 handleSyncCV.notify_all();
159 }
160 return false;
161 });
162
163 handleSyncCV.wait_for(syncLock, maxWait_ms, [&]() -> bool {
164 return returnValue.has_value();
165 });
166
167 unregisterHandler(*conditionName);
168 conditionCount++;
169
170 return returnValue;
171 }
172
179 AsyncPipeline<ObjectType> &unregisterHandler(const std::string &handlerName) {
180 auto guardHandle = std::lock_guard(handleMutex);
181 if (handlers.contains(handlerName)) {
182 handlers.erase(handlerName);
183 }
184
185 return *this;
186 }
187
194 bool hasHandler(const std::string &handlerName) {
195 return handlers.contains(handlerName);
196 }
197
203 [[nodiscard]] bool isPipelineRunning() const {
204 return pipelineState;
205 }
206
212 [[nodiscard]] bool isPipelinePaused() const {
213 return pipelinePaused;
214 }
215
216private:
218 bool pipelineState = false;
219
221 bool pipelinePaused = false;
222
223 std::mutex bufferMutex, handleMutex, waitMutex, sendSyncMutex, handleSyncMutex, pauseMutex;
224 std::condition_variable bufferCV, syncSendCV, handleSyncCV, pauseCV;
225
227 std::deque<ObjectType> buffer;
228
229 std::deque<uint64_t> objectTrackingIdQueue;
230 uint64_t currentTrackingId{0UL};
231
233 std::map<std::string, std::function<void(const ObjectType &object)>> handlers;
234
241 void serviceLoop() {
242 auto lock = std::unique_lock(waitMutex);
243 auto pauseLock = std::unique_lock(pauseMutex);
244 while (pipelineState) {
245 bufferCV.wait_for(lock, 50ms, [&] {
246 return pipelineState && !buffer.empty();
247 });
248
249 pauseCV.wait(pauseLock, [&] {
250 return !pipelinePaused;
251 });
252
253 if (pipelineState && !buffer.empty() && !pipelinePaused) {
254 const auto &front = buffer.front();
255 auto guardHandle = std::lock_guard(handleMutex);
256 std::for_each(handlers.cbegin(), handlers.cend(), [&](const auto &handle) {
257 handle.second(front);
258 });
259
260 auto guard = std::lock_guard(bufferMutex);
261 buffer.pop_front();
262 objectTrackingIdQueue.pop_front();
263 }
264 }
265 }
266};
267
268#endif //PICOSCENES_PLATFORM_ASYNCPIPELINE_HXX
Template class implementing an asynchronous processing pipeline.
bool isPipelineRunning() const
Checks if the pipeline is currently running.
void stopService()
Stops the pipeline service.
std::optional< ObjectType > selectDataByCondition(const std::function< bool(const ObjectType &)> &condition, const std::chrono::milliseconds &maxWait_ms, std::optional< std::string > conditionName)
Selects data from the pipeline based on a condition.
int resume()
Resumes the pipeline processing.
AsyncPipeline< ObjectType > & unregisterHandler(const std::string &handlerName)
Unregisters a handler from the pipeline.
bool hasHandler(const std::string &handlerName)
Checks if a handler exists in the pipeline.
uint64_t send(T &&object)
Sends an object to the pipeline for asynchronous processing.
AsyncPipeline< ObjectType > & registerAsyncHandler(std::string handlerName, std::function< void(const ObjectType &object)> handler)
Registers an asynchronous handler for processing objects.
int pause()
Pauses the pipeline processing.
AsyncPipeline & startService()
Starts the pipeline service.
uint64_t sendSync(T &&object)
Sends an object to the pipeline and waits for processing completion.
bool isPipelinePaused() const
Checks if the pipeline is currently paused.
static T & getInstance(Args... args)
Gets the singleton instance of the class.
Definition Singleton.hxx:36