PicoScenes API Docs
 
Loading...
Searching...
No Matches
AsyncPipeline.hxx
Go to the documentation of this file.
1//
2// Created by Zhiping Jiang on 22-7-21.
3//
4
5#ifndef PICOSCENES_PLATFORM_ASYNCPIPELINE_HXX
6#define PICOSCENES_PLATFORM_ASYNCPIPELINE_HXX
7
8#include "SystemTools.hxx"
10
11using namespace std::chrono_literals;
12
13template<typename ObjectType>
15public:
16
18 pipelineState = true;
19 pipelinePaused = false;
20 std::this_thread::sleep_for(1ms);
22 serviceLoop();
23 });
24 return *this;
25 }
26
27 void stopService() {
28 if (!pipelineState)
29 return;
30
31 pipelineState = false;
32 pipelinePaused = false;
33 bufferMutex.lock();
34 handleMutex.lock();
35 handlers.clear();
36 buffer.clear();
37 objectTrackingIdQueue.clear();
38 bufferMutex.unlock();
39 handleMutex.unlock();
40 std::this_thread::sleep_for(200ms);
41 }
42
43 // Use a different typename to activate perfect forwarding
44 template<typename T>
45 uint64_t send(T&& object) {
46 std::lock_guard lock(bufferMutex);
47 buffer.emplace_back(std::forward<T>(object));
48 auto trackingId = currentTrackingId;
49 objectTrackingIdQueue.emplace_back(currentTrackingId++);
50 bufferCV.notify_all();
51 return trackingId;
52 }
53
54 // Use a different typename to activate perfect forwarding
55 template<typename T>
56 uint64_t sendSync(T&& object) {
57 auto trackingId = send(std::forward<T>(object));
58 std::unique_lock syncLock(sendSyncMutex);
59 while (pipelineState) {
60 syncSendCV.wait_for(syncLock, 1ms, [&]() {
61 return std::find(objectTrackingIdQueue.begin(), objectTrackingIdQueue.end(), trackingId) == objectTrackingIdQueue.end();
62 });
63
64 if (std::find(objectTrackingIdQueue.begin(), objectTrackingIdQueue.end(), trackingId) == objectTrackingIdQueue.end())
65 break;
66 }
67 return trackingId;
68 }
69
70 int pause() {
71 pipelinePaused = true;
72 pauseCV.notify_all();
73 return 0;
74 }
75
76 int resume() {
77 pipelinePaused = false;
78 pauseCV.notify_all();
79 return 0;
80 }
81
82 AsyncPipeline<ObjectType> &registerAsyncHandler(std::string handlerName, std::function<void(const ObjectType &object)> handler) {
83 auto guardHandle = std::lock_guard(handleMutex);
84 handlers[handlerName] = handler;
85
86 return *this;
87 }
88
89 std::optional<ObjectType> selectDataByCondition(const std::function<bool(const ObjectType &)> &condition, const std::chrono::milliseconds &maxWait_ms, std::optional<std::string> conditionName) {
90 thread_local static uint64_t conditionCount = 0;
91 std::optional<ObjectType> returnValue;
92 std::unique_lock syncLock(handleSyncMutex);
93 if (!conditionName)
94 conditionName = "Conditional" + std::to_string(conditionCount);
95
96 registerAsyncHandler(*conditionName, [=, this, &returnValue](const ObjectType &itemToBeProcessed) -> bool {
97 if (condition && condition(itemToBeProcessed)) {
98 returnValue = itemToBeProcessed;
99 handleSyncCV.notify_all();
100 }
101 return false;
102 });
103
104 handleSyncCV.wait_for(syncLock, maxWait_ms, [&]() -> bool {
105 return returnValue.has_value();
106 });
107
108 unregisterHandler(*conditionName);
109 conditionCount++;
110
111 return returnValue;
112 }
113
114 AsyncPipeline<ObjectType> &unregisterHandler(const std::string &handlerName) {
115 auto guardHandle = std::lock_guard(handleMutex);
116 if (handlers.contains(handlerName)) {
117 handlers.erase(handlerName);
118 }
119
120 return *this;
121 }
122
123 bool hasHandler(const std::string &handlerName) {
124 return handlers.contains(handlerName);
125 }
126
127 [[nodiscard]] bool isPipelineRunning() const {
128 return pipelineState;
129 }
130
131 [[nodiscard]] bool isPipelinePaused() const {
132 return pipelinePaused;
133 }
134
135private:
136 bool pipelineState = false, pipelinePaused = false;
137 std::mutex bufferMutex, handleMutex, waitMutex, sendSyncMutex, handleSyncMutex, pauseMutex;
138 std::condition_variable bufferCV, syncSendCV, handleSyncCV, pauseCV;
139 std::deque<ObjectType> buffer;
140 std::deque<uint64_t> objectTrackingIdQueue;
141 uint64_t currentTrackingId{0UL};
142
143 std::map<std::string, std::function<void(const ObjectType &object)>> handlers;
144
145 void serviceLoop() {
146 auto lock = std::unique_lock(waitMutex);
147 auto pauseLock = std::unique_lock(pauseMutex);
148 while (pipelineState) {
149 bufferCV.wait_for(lock, 50ms, [&] {
150 return pipelineState && !buffer.empty();
151 });
152
153 pauseCV.wait(pauseLock, [&] {
154 return !pipelinePaused;
155 });
156
157 if (pipelineState && !buffer.empty() && !pipelinePaused) {
158 const auto &front = buffer.front();
159 auto guardHandle = std::lock_guard(handleMutex);
160 std::for_each(handlers.cbegin(), handlers.cend(), [&](const auto &handle) {
161 handle.second(front);
162 });
163
164 auto guard = std::lock_guard(bufferMutex);
165 buffer.pop_front();
166 objectTrackingIdQueue.pop_front();
167 }
168 }
169 }
170};
171
172#endif //PICOSCENES_PLATFORM_ASYNCPIPELINE_HXX
bool isPipelineRunning() const
std::optional< ObjectType > selectDataByCondition(const std::function< bool(const ObjectType &)> &condition, const std::chrono::milliseconds &maxWait_ms, std::optional< std::string > conditionName)
AsyncPipeline< ObjectType > & unregisterHandler(const std::string &handlerName)
bool hasHandler(const std::string &handlerName)
uint64_t send(T &&object)
AsyncPipeline< ObjectType > & registerAsyncHandler(std::string handlerName, std::function< void(const ObjectType &object)> handler)
AsyncPipeline & startService()
uint64_t sendSync(T &&object)
bool isPipelinePaused() const
static T & getInstance(Args... args)
Definition Singleton.hxx:17