Forráskód Böngészése

队列添加最大长度,溢出策略:丢弃当前帧、阻塞、丢弃最旧帧

leon 1 hónapja
szülő
commit
d0351f89e6

+ 179 - 17
src/common/queue.hpp

@@ -4,36 +4,157 @@
 #include <queue>
 #include <mutex>
 #include <condition_variable>
+#include <functional> // 用于 std::function
+#include <limits>     // 用于 std::numeric_limits
+#include <stdexcept>  // 用于 std::invalid_argument
+
+/**
+ * @brief 定义队列满时的处理策略
+ */
+enum class OverflowStrategy {
+    Block,         //!< 阻塞生产者,直到队列有空间
+    Discard,       //!< 丢弃尝试推入的新元素,立即返回
+    DiscardOldest  //!< 丢弃队列中最旧的元素(队首),然后推入新元素
+};
 
 template <typename T>
 class SharedQueue {
 public:
-    SharedQueue() = default;
+    /**
+     * @brief 构造一个具有指定最大大小和溢出策略的 SharedQueue。
+     * @param max_size 队列可以容纳的最大项目数。
+     *                 默认为 std::numeric_limits<size_t>::max(),表示近乎无限大。
+     *                 max_size 不能为 0。
+     * @param strategy 当队列满时采取的策略 (OverflowStrategy::Block, OverflowStrategy::Discard, 或 OverflowStrategy::DiscardOldest)。
+     *                 默认为 OverflowStrategy::Block。
+     */
+    explicit SharedQueue(size_t max_size = 100,
+                         OverflowStrategy strategy = OverflowStrategy::Block)
+        : max_size_(max_size), strategy_(strategy)
+    {
+        if (max_size_ == 0) {
+            throw std::invalid_argument("SharedQueue 的 max_size 不能为 0");
+        }
+    }
 
-    void set_push_callback(std::function<void()> callback) 
+    // 为了线程安全,删除拷贝构造函数和赋值操作符
+    SharedQueue(const SharedQueue&) = delete;
+    SharedQueue& operator=(const SharedQueue&) = delete;
+
+    /**
+     * @brief 设置一个回调函数,在元素成功推入队列后调用。
+     * 注意:回调函数将在持有队列互斥锁的情况下被调用。
+     * @param callback 要调用的函数。
+     */
+    void set_push_callback(std::function<void()> callback)
     {
         std::lock_guard<std::mutex> lock(mutex_);
         push_callback_ = callback;
     }
 
-    void push(const T& item) {
-        std::lock_guard<std::mutex> lock(mutex_);
+    /**
+     * @brief 将一个元素推入队列 (拷贝方式)。
+     * 根据构造时设置的策略,在队列满时可能阻塞、丢弃新元素或丢弃旧元素。
+     * @param item 要推入的元素。
+     * @return 如果新元素成功进入队列(对于Block和DiscardOldest策略总是true,对于Discard策略在未满时为true),则返回 true;
+     *         如果策略为Discard且队列已满导致新元素被丢弃,则返回 false。
+     */
+    bool push(const T& item) {
+        std::unique_lock<std::mutex> lock(mutex_);
+
+        // 检查队列是否已满,并根据策略处理
+        if (queue_.size() >= max_size_) {
+            switch (strategy_) {
+                case OverflowStrategy::Block:
+                    // 阻塞策略:等待直到队列不再满
+                    cond_var_not_full_.wait(lock, [this] { return queue_.size() < max_size_; });
+                    // 等待结束后,队列保证有空间
+                    break; // 跳出 switch,继续执行 push
+
+                case OverflowStrategy::Discard:
+                    // 丢弃新元素策略:如果满了,直接返回 false
+                    return false;
+
+                case OverflowStrategy::DiscardOldest:
+                    // 丢弃最旧元素策略:移除队首元素
+                    // (理论上 size >= max_size > 0 时 queue_ 不会是 empty)
+                    if (!queue_.empty()) {
+                         queue_.pop();
+                         // 注意:这里不需要 notify cond_var_not_full_,
+                         // 因为我们马上要 push 一个新元素,队列大小净变化为0。
+                         // 其他线程的 pop 操作会负责 notify。
+                    }
+                    break; // 跳出 switch,继续执行 push
+            }
+        }
+
+        // 执行推入操作 (对于 Block 策略在等待后执行,对于 DiscardOldest 在 pop 后执行)
         queue_.push(item);
-        cond_var_.notify_one();
-        if (push_callback_) {  // 触发回调
+
+        // 通知可能在等待的消费者 (wait_pop)
+        cond_var_not_empty_.notify_one();
+
+        // 如果设置了回调,则触发回调 (仍然持有锁)
+        if (push_callback_) {
             push_callback_();
         }
+
+        // 锁将在 unique_lock 析构时自动释放
+        return true; // 新元素已成功进入队列
     }
 
-    void push(T&& item) {
-        std::lock_guard<std::mutex> lock(mutex_);
+    /**
+     * @brief 将一个元素推入队列 (移动方式)。
+     * 根据构造时设置的策略,在队列满时可能阻塞、丢弃新元素或丢弃旧元素。
+     * @param item 要推入的元素 (将被移动)。
+     * @return 如果新元素成功进入队列(对于Block和DiscardOldest策略总是true,对于Discard策略在未满时为true),则返回 true;
+     *         如果策略为Discard且队列已满导致新元素被丢弃,则返回 false。
+     */
+    bool push(T&& item) {
+        std::unique_lock<std::mutex> lock(mutex_);
+
+         // 检查队列是否已满,并根据策略处理
+        if (queue_.size() >= max_size_) {
+            switch (strategy_) {
+                case OverflowStrategy::Block:
+                    // 阻塞策略:等待直到队列不再满
+                    cond_var_not_full_.wait(lock, [this] { return queue_.size() < max_size_; });
+                    break; // 跳出 switch,继续执行 push
+
+                case OverflowStrategy::Discard:
+                    // 丢弃新元素策略:如果满了,直接返回 false
+                    return false;
+
+                case OverflowStrategy::DiscardOldest:
+                    // 丢弃最旧元素策略:移除队首元素
+                    if (!queue_.empty()) {
+                         queue_.pop();
+                         // 同上,不需要 notify cond_var_not_full_
+                    }
+                    break; // 跳出 switch,继续执行 push
+            }
+        }
+
+        // 执行推入操作
         queue_.push(std::move(item));
-        cond_var_.notify_one();
-        if (push_callback_) {  // 触发回调
+
+        // 通知可能在等待的消费者 (wait_pop)
+        cond_var_not_empty_.notify_one();
+
+        // 如果设置了回调,则触发回调 (仍然持有锁)
+        if (push_callback_) {
             push_callback_();
         }
+
+        // 锁将在 unique_lock 析构时自动释放
+        return true; // 新元素已成功进入队列
     }
 
+    /**
+     * @brief 尝试从队列中弹出一个元素,非阻塞。
+     * @param[out] item 用于存储弹出元素的引用。
+     * @return 如果成功弹出一个元素,返回 true;如果队列为空,返回 false。
+     */
     bool try_pop(T& item) {
         std::lock_guard<std::mutex> lock(mutex_);
         if (queue_.empty()) {
@@ -41,33 +162,74 @@ public:
         }
         item = std::move(queue_.front());
         queue_.pop();
+
+        // 通知可能在等待的生产者 (push, Block策略),队列现在有空间了
+        cond_var_not_full_.notify_one();
         return true;
     }
 
+    /**
+     * @brief 等待直到队列中有元素可用,然后弹出并返回该元素。
+     * @return 弹出的元素。
+     */
     T wait_pop() {
         std::unique_lock<std::mutex> lock(mutex_);
-        cond_var_.wait(lock, [this] { return !queue_.empty(); });
+        // 等待直到队列不为空
+        cond_var_not_empty_.wait(lock, [this] { return !queue_.empty(); });
+
         T item = std::move(queue_.front());
         queue_.pop();
-        return item;
+
+        // 通知可能在等待的生产者 (push, Block策略),队列现在有空间了
+        cond_var_not_full_.notify_one();
+
+        // 锁将在 unique_lock 析构时自动释放
+        return item; // RVO 或移动构造函数会高效处理
     }
 
+    /**
+     * @brief 检查队列是否为空。
+     * @return 如果队列为空,返回 true;否则返回 false。
+     */
     bool empty() const {
         std::lock_guard<std::mutex> lock(mutex_);
         return queue_.empty();
     }
 
+    /**
+     * @brief 返回队列中当前的元素数量。
+     * @return 元素数量。
+     */
     size_t size() const {
         std::lock_guard<std::mutex> lock(mutex_);
         return queue_.size();
     }
 
+    /**
+     * @brief 返回队列的最大容量。
+     * @return 最大容量。
+     */
+    size_t capacity() const {
+        return max_size_;
+    }
+
+    /**
+     * @brief 获取当前设置的溢出处理策略。
+     * @return 当前的 OverflowStrategy。
+     */
+    OverflowStrategy get_overflow_strategy() const {
+        return strategy_;
+    }
+
+
 private:
-    mutable std::mutex mutex_;
+    mutable std::mutex mutex_; // mutable 允许在 const 方法中 lock
     std::queue<T> queue_;
-    std::condition_variable cond_var_;
-    // 回调函数,用于通知节点的条件变量
-    std::function<void()> push_callback_;
+    std::condition_variable cond_var_not_empty_; // 用于通知消费者:队列不再为空
+    std::condition_variable cond_var_not_full_;  // 用于通知生产者:队列不再满 (主要用于 Block 策略)
+    const size_t max_size_;                // 最大队列大小
+    const OverflowStrategy strategy_;      // 队列满时的处理策略
+    std::function<void()> push_callback_; // 推入成功后的回调函数
 };
 
-#endif  // QUEUE_HPP__
+#endif // QUEUE_HPP__

+ 8 - 4
src/main.cpp

@@ -30,6 +30,10 @@ void test_depth()
 
 void test_yolo()
 {
+    OverflowStrategy stage = OverflowStrategy::Block;
+    int max_size = 100;
+
+
     // std::vector<std::string> names = { "person", "clothes", "vest" };
     std::vector<std::string> names = { "person", "car", "close", "open" };
     // std::shared_ptr<GNode::StreamNode> src_node0   = std::make_shared<GNode::StreamNode>("src0", "rtsp://admin:lww123456@172.16.22.16:554/Streaming/Channels/201", 0, GNode::DecodeType::GPU);
@@ -48,10 +52,10 @@ void test_yolo()
     record_node->set_fps(25);
     record_node->set_fourcc(cv::VideoWriter::fourcc('X', '2', '6', '4'));
     
-    GNode::LinkNode(src_node0, infer_node);
-    GNode::LinkNode(infer_node, track_node);
-    GNode::LinkNode(track_node, draw_node);
-    GNode::LinkNode(draw_node, record_node);
+    GNode::LinkNode(src_node0, infer_node, max_size, stage);
+    GNode::LinkNode(infer_node, track_node, max_size, stage);
+    GNode::LinkNode(track_node, draw_node, max_size, stage);
+    GNode::LinkNode(draw_node, record_node, max_size, stage);
     record_node->start();
     draw_node->start();
     track_node->start();

+ 1 - 1
src/nodes/base/base.hpp

@@ -97,7 +97,7 @@ protected:
 };
 
 static inline void LinkNode(const std::shared_ptr<BaseNode> &front,
-    const std::shared_ptr<BaseNode> &back) 
+    const std::shared_ptr<BaseNode> &back, int queue_size = 100, OverflowStrategy strategy = OverflowStrategy::Block) 
 {
     auto queue = std::make_shared<SharedQueue<std::shared_ptr<meta::MetaData>>>();
     back->add_input_buffer(front->get_name(), queue);

+ 1 - 1
src/nodes/infer/inferNode.cpp

@@ -26,7 +26,7 @@ void InferNode::work()
     printf("InferNode %s\n", name_.c_str());
     while (running_)
     {
-        Timer timer("InferNode");
+        // Timer timer("InferNode");
         bool has_data = false;
         for (auto& input_buffer : input_buffers_)
         {

+ 2 - 2
src/nodes/stream/streamNode.cpp

@@ -73,7 +73,7 @@ void StreamNode::work_gpu()
         }
         int ndecoded_frame = decoder_->decode(packet_data, packet_size, pts);
         for(int i = 0; i < ndecoded_frame; ++i){
-            Timer timer("StreamNode");
+            // Timer timer("StreamNode");
             cv::Mat frame(decoder_->get_height(), decoder_->get_width(), CV_8UC3, decoder_->get_frame(&pts, &frame_index));
             frame_index = frame_index + 1;
             
@@ -95,7 +95,7 @@ void StreamNode::work_gpu()
             std::this_thread::sleep_for(std::chrono::milliseconds(30));
         }
     };
-	printf("C++ Demo: %d frames\n", frame_index);
+	printf("Total frame : %d frames\n", frame_index);
 }