19 pipelinePaused =
false;
20 std::this_thread::sleep_for(1ms);
31 pipelineState =
false;
32 pipelinePaused =
false;
37 objectTrackingIdQueue.clear();
40 std::this_thread::sleep_for(200ms);
45 uint64_t
send(T&&
object) {
46 std::lock_guard lock(bufferMutex);
47 buffer.emplace_back(std::forward<T>(
object));
48 auto trackingId = currentTrackingId;
49 objectTrackingIdQueue.emplace_back(currentTrackingId++);
50 bufferCV.notify_all();
57 auto trackingId =
send(std::forward<T>(
object));
58 std::unique_lock syncLock(sendSyncMutex);
59 while (pipelineState) {
60 syncSendCV.wait_for(syncLock, 1ms, [&]() {
61 return std::find(objectTrackingIdQueue.begin(), objectTrackingIdQueue.end(), trackingId) == objectTrackingIdQueue.end();
64 if (std::find(objectTrackingIdQueue.begin(), objectTrackingIdQueue.end(), trackingId) == objectTrackingIdQueue.end())
71 pipelinePaused =
true;
77 pipelinePaused =
false;
83 auto guardHandle = std::lock_guard(handleMutex);
84 handlers[handlerName] = handler;
89 std::optional<ObjectType>
selectDataByCondition(
const std::function<
bool(
const ObjectType &)> &condition,
const std::chrono::milliseconds &maxWait_ms, std::optional<std::string> conditionName) {
90 thread_local static uint64_t conditionCount = 0;
91 std::optional<ObjectType> returnValue;
92 std::unique_lock syncLock(handleSyncMutex);
94 conditionName =
"Conditional" + std::to_string(conditionCount);
96 registerAsyncHandler(*conditionName, [=,
this, &returnValue](
const ObjectType &itemToBeProcessed) ->
bool {
97 if (condition && condition(itemToBeProcessed)) {
98 returnValue = itemToBeProcessed;
99 handleSyncCV.notify_all();
104 handleSyncCV.wait_for(syncLock, maxWait_ms, [&]() ->
bool {
105 return returnValue.has_value();
115 auto guardHandle = std::lock_guard(handleMutex);
116 if (handlers.contains(handlerName)) {
117 handlers.erase(handlerName);
124 return handlers.contains(handlerName);
128 return pipelineState;
132 return pipelinePaused;
136 bool pipelineState =
false, pipelinePaused =
false;
137 std::mutex bufferMutex, handleMutex, waitMutex, sendSyncMutex, handleSyncMutex, pauseMutex;
138 std::condition_variable bufferCV, syncSendCV, handleSyncCV, pauseCV;
139 std::deque<ObjectType> buffer;
140 std::deque<uint64_t> objectTrackingIdQueue;
141 uint64_t currentTrackingId{0UL};
143 std::map<std::string, std::function<void(
const ObjectType &
object)>> handlers;
146 auto lock = std::unique_lock(waitMutex);
147 auto pauseLock = std::unique_lock(pauseMutex);
148 while (pipelineState) {
149 bufferCV.wait_for(lock, 50ms, [&] {
150 return pipelineState && !buffer.empty();
153 pauseCV.wait(pauseLock, [&] {
154 return !pipelinePaused;
157 if (pipelineState && !buffer.empty() && !pipelinePaused) {
158 const auto &front = buffer.front();
159 auto guardHandle = std::lock_guard(handleMutex);
160 std::for_each(handlers.cbegin(), handlers.cend(), [&](
const auto &handle) {
161 handle.second(front);
164 auto guard = std::lock_guard(bufferMutex);
166 objectTrackingIdQueue.pop_front();