5#ifndef PICOSCENES_PLATFORM_ASYNCPIPELINE_HXX
6#define PICOSCENES_PLATFORM_ASYNCPIPELINE_HXX
11using namespace std::chrono_literals;
23template<
typename ObjectType>
35 pipelinePaused =
false;
36 std::this_thread::sleep_for(1ms);
53 pipelineState =
false;
54 pipelinePaused =
false;
59 objectTrackingIdQueue.clear();
62 std::this_thread::sleep_for(200ms);
73 uint64_t
send(T&&
object) {
74 std::lock_guard lock(bufferMutex);
75 buffer.emplace_back(std::forward<T>(
object));
76 auto trackingId = currentTrackingId;
77 objectTrackingIdQueue.emplace_back(currentTrackingId++);
78 bufferCV.notify_all();
91 auto trackingId =
send(std::forward<T>(
object));
92 std::unique_lock syncLock(sendSyncMutex);
93 while (pipelineState) {
94 syncSendCV.wait_for(syncLock, 1ms, [&]() {
95 return std::find(objectTrackingIdQueue.begin(), objectTrackingIdQueue.end(), trackingId) == objectTrackingIdQueue.end();
98 if (std::find(objectTrackingIdQueue.begin(), objectTrackingIdQueue.end(), trackingId) == objectTrackingIdQueue.end())
110 pipelinePaused =
true;
111 pauseCV.notify_all();
121 pipelinePaused =
false;
122 pauseCV.notify_all();
134 auto guardHandle = std::lock_guard(handleMutex);
135 handlers[handlerName] = handler;
148 std::optional<ObjectType>
selectDataByCondition(
const std::function<
bool(
const ObjectType &)> &condition,
const std::chrono::milliseconds &maxWait_ms, std::optional<std::string> conditionName) {
149 thread_local static uint64_t conditionCount = 0;
150 std::optional<ObjectType> returnValue;
151 std::unique_lock syncLock(handleSyncMutex);
153 conditionName =
"Conditional" + std::to_string(conditionCount);
155 registerAsyncHandler(*conditionName, [=,
this, &returnValue](
const ObjectType &itemToBeProcessed) ->
bool {
156 if (condition && condition(itemToBeProcessed)) {
157 returnValue = itemToBeProcessed;
158 handleSyncCV.notify_all();
163 handleSyncCV.wait_for(syncLock, maxWait_ms, [&]() ->
bool {
164 return returnValue.has_value();
180 auto guardHandle = std::lock_guard(handleMutex);
181 if (handlers.contains(handlerName)) {
182 handlers.erase(handlerName);
195 return handlers.contains(handlerName);
204 return pipelineState;
213 return pipelinePaused;
218 bool pipelineState =
false;
221 bool pipelinePaused =
false;
223 std::mutex bufferMutex, handleMutex, waitMutex, sendSyncMutex, handleSyncMutex, pauseMutex;
224 std::condition_variable bufferCV, syncSendCV, handleSyncCV, pauseCV;
227 std::deque<ObjectType> buffer;
229 std::deque<uint64_t> objectTrackingIdQueue;
230 uint64_t currentTrackingId{0UL};
233 std::map<std::string, std::function<void(
const ObjectType &
object)>> handlers;
242 auto lock = std::unique_lock(waitMutex);
243 auto pauseLock = std::unique_lock(pauseMutex);
244 while (pipelineState) {
245 bufferCV.wait_for(lock, 50ms, [&] {
246 return pipelineState && !buffer.empty();
249 pauseCV.wait(pauseLock, [&] {
250 return !pipelinePaused;
253 if (pipelineState && !buffer.empty() && !pipelinePaused) {
254 const auto &front = buffer.front();
255 auto guardHandle = std::lock_guard(handleMutex);
256 std::for_each(handlers.cbegin(), handlers.cend(), [&](
const auto &handle) {
257 handle.second(front);
260 auto guard = std::lock_guard(bufferMutex);
262 objectTrackingIdQueue.pop_front();
Template class implementing an asynchronous processing pipeline.
bool isPipelineRunning() const
Checks if the pipeline is currently running.
void stopService()
Stops the pipeline service.
std::optional< ObjectType > selectDataByCondition(const std::function< bool(const ObjectType &)> &condition, const std::chrono::milliseconds &maxWait_ms, std::optional< std::string > conditionName)
Selects data from the pipeline based on a condition.
int resume()
Resumes the pipeline processing.
AsyncPipeline< ObjectType > & unregisterHandler(const std::string &handlerName)
Unregisters a handler from the pipeline.
bool hasHandler(const std::string &handlerName)
Checks if a handler exists in the pipeline.
uint64_t send(T &&object)
Sends an object to the pipeline for asynchronous processing.
AsyncPipeline< ObjectType > & registerAsyncHandler(std::string handlerName, std::function< void(const ObjectType &object)> handler)
Registers an asynchronous handler for processing objects.
int pause()
Pauses the pipeline processing.
AsyncPipeline & startService()
Starts the pipeline service.
uint64_t sendSync(T &&object)
Sends an object to the pipeline and waits for processing completion.
bool isPipelinePaused() const
Checks if the pipeline is currently paused.
static T & getInstance(Args... args)
Gets the singleton instance of the class.