plugin  0.1.0
input_wrappers.hpp
1 // Copyright (C) 2018-2024 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 #pragma once
6 
7 #include <list>
8 #include <memory>
9 #include <set>
10 #include <thread>
11 #include <vector>
12 #include <queue>
13 
14 #include <opencv2/opencv.hpp>
15 
16 class InputChannel;
17 
18 class IInputSource {
19 public:
20  virtual bool read(cv::Mat& mat, const std::shared_ptr<InputChannel>& caller) = 0;
21  virtual void addSubscriber(const std::weak_ptr<InputChannel>& inputChannel) = 0;
22  virtual cv::Size getSize() = 0;
23  virtual void lock() {
24  sourceLock.lock();
25  }
26  virtual void unlock() {
27  sourceLock.unlock();
28  }
29  virtual ~IInputSource() = default;
30 private:
31  std::mutex sourceLock;
32 };
33 
34 class InputChannel: public std::enable_shared_from_this<InputChannel> { // note: public inheritance
35 public:
36  InputChannel(const InputChannel&) = delete;
37  InputChannel& operator=(const InputChannel&) = delete;
38  static std::shared_ptr<InputChannel> create(const std::shared_ptr<IInputSource>& source) {
39  auto tmp = std::shared_ptr<InputChannel>(new InputChannel(source));
40  source->addSubscriber(tmp);
41  return tmp;
42  }
43  bool read(cv::Mat& mat) {
44  readQueueMutex.lock();
45  if (readQueue.empty()) {
46  readQueueMutex.unlock();
47  source->lock();
48  readQueueMutex.lock();
49  if (readQueue.empty()) {
50  bool res = source->read(mat, shared_from_this());
51  readQueueMutex.unlock();
52  source->unlock();
53  return res;
54  } else {
55  source->unlock();
56  }
57  }
58  mat = readQueue.front().clone();
59  readQueue.pop();
60  readQueueMutex.unlock();
61  return true;
62  }
63  void push(const cv::Mat& mat) {
64  readQueueMutex.lock();
65  readQueue.push(mat);
66  readQueueMutex.unlock();
67  }
68  cv::Size getSize() {
69  return source->getSize();
70  }
71 
72 private:
73  explicit InputChannel(const std::shared_ptr<IInputSource>& source): source{source} {}
74  std::shared_ptr<IInputSource> source;
75  std::queue<cv::Mat, std::list<cv::Mat>> readQueue;
76  std::mutex readQueueMutex;
77 };
78 
80 public:
81  VideoCaptureSource(const cv::VideoCapture& videoCapture, bool loop): videoCapture{videoCapture}, loop{loop},
82  imSize{static_cast<int>(videoCapture.get(cv::CAP_PROP_FRAME_WIDTH)), static_cast<int>(videoCapture.get(cv::CAP_PROP_FRAME_HEIGHT))} {}
83  bool read(cv::Mat& mat, const std::shared_ptr<InputChannel>& caller) override {
84  if (!videoCapture.read(mat)) {
85  if (loop) {
86  videoCapture.set(cv::CAP_PROP_POS_FRAMES, 0);
87  videoCapture.read(mat);
88  } else {
89  return false;
90  }
91  }
92  if (1 != subscribedInputChannels.size()) {
93  cv::Mat shared = mat.clone();
94  for (const std::weak_ptr<InputChannel>& weakInputChannel : subscribedInputChannels) {
95  try {
96  std::shared_ptr<InputChannel> sharedInputChannel = std::shared_ptr<InputChannel>(weakInputChannel);
97  if (caller != sharedInputChannel) {
98  sharedInputChannel->push(shared);
99  }
100  } catch (const std::bad_weak_ptr&) {}
101  }
102  }
103  return true;
104  }
105  void addSubscriber(const std::weak_ptr<InputChannel>& inputChannel) override {
106  subscribedInputChannels.push_back(inputChannel);
107  }
108  cv::Size getSize() override {
109  return imSize;
110  }
111 
112 private:
113  std::vector<std::weak_ptr<InputChannel>> subscribedInputChannels;
114  cv::VideoCapture videoCapture;
115  bool loop;
116  cv::Size imSize;
117 };
118 
119 class ImageSource: public IInputSource {
120 public:
121  ImageSource(const cv::Mat& im, bool loop): im{im.clone()}, loop{loop} {} // clone to avoid image changing
122  bool read(cv::Mat& mat, const std::shared_ptr<InputChannel>& caller) override {
123  if (!loop) {
124  auto subscribedInputChannelsIt = subscribedInputChannels.find(caller);
125  if (subscribedInputChannels.end() == subscribedInputChannelsIt) {
126  return false;
127  } else {
128  subscribedInputChannels.erase(subscribedInputChannelsIt);
129  mat = im;
130  return true;
131  }
132  } else {
133  mat = im;
134  return true;
135  }
136  }
137  void addSubscriber(const std::weak_ptr<InputChannel>& inputChannel) override {
138  if (false == subscribedInputChannels.insert(inputChannel).second)
139  throw std::invalid_argument("The insertion did not take place");
140  }
141  cv::Size getSize() override {
142  return im.size();
143  }
144 
145 private:
146  std::set<std::weak_ptr<InputChannel>, std::owner_less<std::weak_ptr<InputChannel>>> subscribedInputChannels;
147  cv::Mat im;
148  bool loop;
149 };
Definition: input_wrappers.hpp:18
Definition: input_wrappers.hpp:119
Definition: input_wrappers.hpp:34
Definition: input_wrappers.hpp:79