9 #include <condition_variable>
18 #include <opencv2/core/core.hpp>
24 typedef std::shared_ptr<VideoFrame> Ptr;
26 VideoFrame(
unsigned sourceID, int64_t frameId,
const cv::Mat& frame = cv::Mat()) :
27 sourceID{sourceID}, frameId{frameId}, frame{frame} {}
30 const unsigned sourceID;
31 const int64_t frameId;
34 PerformanceMetrics::TimePoint timestamp;
41 explicit Task(VideoFrame::Ptr sharedVideoFrame,
float priority = 0):
42 sharedVideoFrame{sharedVideoFrame}, priority{priority} {}
43 virtual bool isReady() = 0;
44 virtual void process() = 0;
45 virtual ~
Task() =
default;
48 VideoFrame::Ptr sharedVideoFrame;
53 bool operator()(
const std::shared_ptr<Task>& lhs,
const std::shared_ptr<Task>& rhs)
const {
54 return lhs->priority > rhs->priority
55 || (lhs->priority == rhs->priority && lhs->sharedVideoFrame->frameId < rhs->sharedVideoFrame->frameId)
56 || (lhs->priority == rhs->priority && lhs->sharedVideoFrame->frameId == rhs->sharedVideoFrame->frameId && lhs < rhs);
62 explicit Worker(
unsigned threadNum):
63 threadPool(threadNum), running{
false} {}
69 for (std::thread& t : threadPool) {
70 t = std::thread(&Worker::threadFunc,
this);
73 void push(std::shared_ptr<Task> task) {
77 tasksCondVar.notify_one();
81 std::unique_lock<std::mutex> lk(tasksMutex);
82 while (running && tasks.empty()) {
83 tasksCondVar.wait(lk);
86 auto it = std::find_if(tasks.begin(), tasks.end(), [](
const std::shared_ptr<Task>& task){return task->isReady();});
87 if (tasks.end() != it) {
88 const std::shared_ptr<Task> task = std::move(*it);
94 std::lock_guard<std::mutex> lock{exceptionMutex};
95 if (
nullptr == currentException) {
96 currentException = std::current_exception();
104 tasksCondVar.notify_all();
107 for (
auto& t : threadPool) {
110 if (
nullptr != currentException) {
111 std::rethrow_exception(currentException);
116 std::condition_variable tasksCondVar;
118 std::mutex tasksMutex;
119 std::vector<std::thread> threadPool;
120 std::atomic<bool> running;
121 std::exception_ptr currentException;
122 std::mutex exceptionMutex;
125 void tryPush(
const std::weak_ptr<Worker>& worker, std::shared_ptr<Task>&& task) {
127 std::shared_ptr<Worker>(worker)->push(task);
128 }
catch (
const std::bad_weak_ptr&) {}
134 mutable std::mutex mutex;
136 bool lockedEmpty()
const noexcept {
137 std::lock_guard<std::mutex> lock{mutex};
138 return container.empty();
140 typename C::size_type lockedSize()
const noexcept {
141 std::lock_guard<std::mutex> lock{mutex};
142 return container.size();
144 void lockedPushBack(
const typename C::value_type& value) {
145 std::lock_guard<std::mutex> lock{mutex};
146 container.push_back(value);
148 bool lockedTryPop(
typename C::value_type& value) {
150 if (!container.empty()) {
151 value = container.back();
152 container.pop_back();
162 std::lock_guard<std::mutex> lock{mutex};
Definition: threads_common.hpp:131
Definition: threads_common.hpp:39
Definition: threads_common.hpp:22
Definition: threads_common.hpp:60
Definition: threads_common.hpp:52