plugin  0.1.0
threads_common.hpp
1 // Copyright (C) 2018-2024 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 #pragma once
6 
7 #include <algorithm>
8 #include <atomic>
9 #include <condition_variable>
10 #include <memory>
11 #include <mutex>
12 #include <utility>
13 #include <set>
14 #include <string>
15 #include <thread>
16 #include <vector>
17 
18 #include <opencv2/core/core.hpp>
20 
21 // VideoFrame can represent not a single image but the whole grid
22 class VideoFrame {
23 public:
24  typedef std::shared_ptr<VideoFrame> Ptr;
25 
26  VideoFrame(unsigned sourceID, int64_t frameId, const cv::Mat& frame = cv::Mat()) :
27  sourceID{sourceID}, frameId{frameId}, frame{frame} {}
28  virtual ~VideoFrame() = default; // A user has to define how it is reconstructed
29 
30  const unsigned sourceID;
31  const int64_t frameId;
32  cv::Mat frame;
33 
34  PerformanceMetrics::TimePoint timestamp;
35 };
36 
37 class Worker;
38 
39 class Task {
40 public:
41  explicit Task(VideoFrame::Ptr sharedVideoFrame, float priority = 0):
42  sharedVideoFrame{sharedVideoFrame}, priority{priority} {}
43  virtual bool isReady() = 0;
44  virtual void process() = 0;
45  virtual ~Task() = default;
46 
47  std::string name;
48  VideoFrame::Ptr sharedVideoFrame; // it is possible that two tasks try to draw on the same cvMat
49  const float priority;
50 };
51 
53  bool operator()(const std::shared_ptr<Task>& lhs, const std::shared_ptr<Task>& rhs) const {
54  return lhs->priority > rhs->priority
55  || (lhs->priority == rhs->priority && lhs->sharedVideoFrame->frameId < rhs->sharedVideoFrame->frameId)
56  || (lhs->priority == rhs->priority && lhs->sharedVideoFrame->frameId == rhs->sharedVideoFrame->frameId && lhs < rhs);
57  }
58 };
59 
60 class Worker {
61 public:
62  explicit Worker(unsigned threadNum):
63  threadPool(threadNum), running{false} {}
64  ~Worker() {
65  stop();
66  }
67  void runThreads() {
68  running = true;
69  for (std::thread& t : threadPool) {
70  t = std::thread(&Worker::threadFunc, this);
71  }
72  }
73  void push(std::shared_ptr<Task> task) {
74  tasksMutex.lock();
75  tasks.insert(task);
76  tasksMutex.unlock();
77  tasksCondVar.notify_one();
78  }
79  void threadFunc() {
80  while (running) {
81  std::unique_lock<std::mutex> lk(tasksMutex);
82  while (running && tasks.empty()) {
83  tasksCondVar.wait(lk);
84  }
85  try {
86  auto it = std::find_if(tasks.begin(), tasks.end(), [](const std::shared_ptr<Task>& task){return task->isReady();});
87  if (tasks.end() != it) {
88  const std::shared_ptr<Task> task = std::move(*it);
89  tasks.erase(it);
90  lk.unlock();
91  task->process();
92  }
93  } catch (...) {
94  std::lock_guard<std::mutex> lock{exceptionMutex};
95  if (nullptr == currentException) {
96  currentException = std::current_exception();
97  stop();
98  }
99  }
100  }
101  }
102  void stop() {
103  running = false;
104  tasksCondVar.notify_all();
105  }
106  void join() {
107  for (auto& t : threadPool) {
108  t.join();
109  }
110  if (nullptr != currentException) {
111  std::rethrow_exception(currentException);
112  }
113  }
114 
115 private:
116  std::condition_variable tasksCondVar;
117  std::set<std::shared_ptr<Task>, HigherPriority> tasks;
118  std::mutex tasksMutex;
119  std::vector<std::thread> threadPool;
120  std::atomic<bool> running;
121  std::exception_ptr currentException;
122  std::mutex exceptionMutex;
123 };
124 
125 void tryPush(const std::weak_ptr<Worker>& worker, std::shared_ptr<Task>&& task) {
126  try {
127  std::shared_ptr<Worker>(worker)->push(task);
128  } catch (const std::bad_weak_ptr&) {}
129 }
130 
131 template <class C> class ConcurrentContainer {
132 public:
133  C container;
134  mutable std::mutex mutex;
135 
136  bool lockedEmpty() const noexcept {
137  std::lock_guard<std::mutex> lock{mutex};
138  return container.empty();
139  }
140  typename C::size_type lockedSize() const noexcept {
141  std::lock_guard<std::mutex> lock{mutex};
142  return container.size();
143  }
144  void lockedPushBack(const typename C::value_type& value) {
145  std::lock_guard<std::mutex> lock{mutex};
146  container.push_back(value);
147  }
148  bool lockedTryPop(typename C::value_type& value) {
149  mutex.lock();
150  if (!container.empty()) {
151  value = container.back();
152  container.pop_back();
153  mutex.unlock();
154  return true;
155  } else {
156  mutex.unlock();
157  return false;
158  }
159  }
160 
161  operator C() const {
162  std::lock_guard<std::mutex> lock{mutex};
163  return container;
164  }
165 };
Definition: threads_common.hpp:131
Definition: threads_common.hpp:39
Definition: threads_common.hpp:22
Definition: threads_common.hpp:60
a header file for performance metrics calculation class
Definition: threads_common.hpp:52