leon il y a 4 semaines
Parent
commit
6c835bf7f1

+ 20 - 3
src/common/queue.hpp

@@ -13,6 +13,7 @@
  */
 enum class OverflowStrategy {
     Block,         //!< 阻塞生产者,直到队列有空间
+    BlockTimeout,  //!< 阻塞生产者,直到队列有空间或者超时,超时后丢弃
     Discard,       //!< 丢弃尝试推入的新元素,立即返回
     DiscardOldest  //!< 丢弃队列中最旧的元素(队首),然后推入新元素
 };
@@ -29,8 +30,9 @@ public:
      *                 默认为 OverflowStrategy::Block。
      */
     explicit SharedQueue(size_t max_size = 100,
-                         OverflowStrategy strategy = OverflowStrategy::Block)
-        : max_size_(max_size), strategy_(strategy)
+                         OverflowStrategy strategy = OverflowStrategy::BlockTimeout,
+                         std::chrono::milliseconds timeout = std::chrono::milliseconds(0))
+        : max_size_(max_size), strategy_(strategy), timeout_(timeout)
     {
         if (max_size_ == 0) {
             throw std::invalid_argument("SharedQueue 的 max_size 不能为 0");
@@ -70,6 +72,15 @@ public:
                     cond_var_not_full_.wait(lock, [this] { return queue_.size() < max_size_; });
                     // 等待结束后,队列保证有空间
                     break; // 跳出 switch,继续执行 push
+                
+                case OverflowStrategy::BlockTimeout:
+                    // 带超时的阻塞策略
+                    if (!cond_var_not_full_.wait_for(lock, timeout_, [this] { return queue_.size() < max_size_; })) {
+                        // wait_for 返回 false 表示超时且条件仍不满足
+                        return false; // 推入失败 (超时)
+                    }
+                    // 等待成功 (未超时或被唤醒且条件满足),继续下一次 while 检查
+                    break; // 跳出 switch,继续 while 检查
 
                 case OverflowStrategy::Discard:
                     // 丢弃新元素策略:如果满了,直接返回 false
@@ -120,7 +131,12 @@ public:
                     // 阻塞策略:等待直到队列不再满
                     cond_var_not_full_.wait(lock, [this] { return queue_.size() < max_size_; });
                     break; // 跳出 switch,继续执行 push
-
+                
+                case OverflowStrategy::BlockTimeout:
+                    if (!cond_var_not_full_.wait_for(lock, timeout_, [this] { return queue_.size() < max_size_; })) {
+                       return false; // 推入失败 (超时)
+                    }
+                   break;
                 case OverflowStrategy::Discard:
                     // 丢弃新元素策略:如果满了,直接返回 false
                     return false;
@@ -229,6 +245,7 @@ private:
     std::condition_variable cond_var_not_full_;  // 用于通知生产者:队列不再满 (主要用于 Block 策略)
     const size_t max_size_;                // 最大队列大小
     const OverflowStrategy strategy_;      // 队列满时的处理策略
+    const std::chrono::milliseconds timeout_; // 超时时间 (用于 BlockTimeout)
     std::function<void()> push_callback_; // 推入成功后的回调函数
 };
 

+ 5 - 3
src/main.cpp

@@ -39,14 +39,14 @@ void test_depth()
 
 void test_yolo()
 {
-    OverflowStrategy stage = OverflowStrategy::Block;
+    OverflowStrategy stage = OverflowStrategy::BlockTimeout;
     int max_size = 100;
 
 
     // std::vector<std::string> names = { "person", "clothes", "vest" };
     std::vector<std::string> names = { "person", "car", "close", "open" };
     // std::shared_ptr<GNode::StreamNode> src_node0   = std::make_shared<GNode::StreamNode>("src0", "rtsp://admin:lww123456@172.16.22.16:554/Streaming/Channels/201", 0, GNode::DecodeType::GPU);
-    std::shared_ptr<GNode::StreamNode> src_node0   = std::make_shared<GNode::StreamNode>("src0", "carperson.mp4", 0, GNode::DecodeType::GPU);
+    std::shared_ptr<GNode::StreamNode> src_node0   = std::make_shared<GNode::StreamNode>("src0", "carperson.mp4", 1, GNode::DecodeType::GPU);
     src_node0->set_skip_frame(1);
 
     std::shared_ptr<Infer> yolo_model = load("model/carperson.engine", ModelType::YOLOV5, names, 1, 0.25, 0.45);
@@ -132,9 +132,11 @@ int main()
 // 模型复用         完成,基类加锁保证一致性
 // 画图节点         完成
 // 推送节点         基本完成
+// 日志             完成
+
 
 // 分析节点         
 // YOLO11 seg 
 // 通过配置文件创建 pipeline
-// 日志
+
 // 设置电子围栏

+ 2 - 12
src/nodes/base/base.cpp

@@ -17,14 +17,9 @@ BaseNode::~BaseNode()
 
 void BaseNode::start()
 {
-    if (!running_)
+    if (!running_.exchange(true))
     {
         worker_thread_ = std::thread(&BaseNode::work, this);
-        running_ = true;
-    }
-    else
-    {
-        std::cerr << "Error: node is already running" << std::endl;
     }
 }
 
@@ -33,16 +28,11 @@ void BaseNode::stop()
     if (running_.exchange(false))
     {
         cond_var_->notify_all();
-        running_ = false;
         if (worker_thread_.joinable())
         {
             worker_thread_.join();
         }
-        printf("Node %s stopped\n", name_.c_str());
-    }
-    else
-    {
-        printf("Node %s is not running\n", name_.c_str());
+        PLOGI.printf("Node : [%s] stopped\n", name_.c_str());
     }
 }
 

+ 23 - 59
src/nodes/stream/streamNode.cpp

@@ -6,25 +6,19 @@
 
 namespace GNode
 {
-
-// --- Private Helper Methods ---
-
 void StreamNode::close_stream() {
     PLOGI.printf("StreamNode [%s]: Closing stream...", name_.c_str());
-    // Reset pointers, which will call destructors if they are unique owners
     cap_.reset();
-    decoder_.reset(); // Decoder depends on demuxer info, close it first potentially
+    decoder_.reset();
     demuxer_.reset();
     status_ = StreamStatus::CLOSED;
-    frame_count_ = -1; // Reset frame count for the next potential stream
+    frame_count_ = -1;
 }
 
 bool StreamNode::open_stream() {
-    // Ensure any previous stream is closed before opening a new one
     close_stream();
-
     PLOGI.printf("StreamNode [%s]: Attempting to open stream: %s", name_.c_str(), stream_url_.c_str());
-    status_ = StreamStatus::CLOSED; // Start as closed before trying
+    status_ = StreamStatus::CLOSED;
 
     if (decode_type_ == DecodeType::GPU)
     {
@@ -52,12 +46,10 @@ bool StreamNode::open_stream() {
         printf("StreamNode [%s]: GPU Demuxer and Decoder created successfully.", name_.c_str());
         status_ = StreamStatus::OPENED;
     }
-    else // DecodeType::CPU
+    else
     {
         cap_ = std::make_shared<cv::VideoCapture>();
-        // Optionally set backend preference if needed
-        // cap_->open(stream_url_, cv::CAP_FFMPEG);
-        if (!cap_->open(stream_url_)) // Check return value of open
+        if (!cap_->open(stream_url_))
         {
             PLOGI.printf("StreamNode [%s] Error: CPU cv::VideoCapture failed to open %s", name_.c_str(), stream_url_.c_str());
             cap_.reset(); // Release the failed object
@@ -75,48 +67,41 @@ bool StreamNode::open_stream() {
         status_ = StreamStatus::OPENED;
     }
 
-    frame_count_ = -1; // Reset frame count upon successful open
+    frame_count_ = -1;
     return true;
 }
 
 
-// --- Main Work Loop ---
-
 void StreamNode::work()
 {
     PLOGI.printf("StreamNode [%s] starting work loop. Decode type: %s",
            name_.c_str(), (decode_type_ == DecodeType::GPU ? "GPU" : "CPU"));
-    while (running_) // Main loop continues as long as the node is supposed to run
+    while (running_)
     {
-        if (status_ != StreamStatus::OPENED) // Check if stream needs opening/reopening
+        if (status_ != StreamStatus::OPENED)
         {
             PLOGI.printf("StreamNode [%s]: Stream not open (Status: %d). Attempting to open...",
                    name_.c_str(), static_cast<int>(status_));
 
-            if (open_stream()) // Try to open
+            if (open_stream())
             {
                 PLOGI.printf("StreamNode [%s]: Stream opened successfully.", name_.c_str());
-                // Continue to processing immediately after successful open
             }
             else
             {
-                // Opening failed, wait before retrying
                 PLOGI.printf("StreamNode [%s]: Failed to open stream. Retrying in %d ms...",
                         name_.c_str(), retry_delay_ms_);
                 status_ = StreamStatus::OPEN_FAILED; // Ensure status reflects failure
 
-                // Wait for the specified delay, but check running_ periodically
                 auto wakeUpTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(retry_delay_ms_);
                 while (running_ && std::chrono::steady_clock::now() < wakeUpTime) {
                      std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep in smaller chunks
                 }
 
-                if (!running_) break; // Exit outer loop if stopped during wait
-                continue; // Go back to the start of the outer loop to retry opening
+                if (!running_) break;
             }
         }
 
-        // If we reach here, the stream should be OPENED
         if (status_ == StreamStatus::OPENED)
         {
             PLOGI.printf("StreamNode [%s]: Starting stream processing...", name_.c_str());
@@ -128,34 +113,28 @@ void StreamNode::work()
             {
                 process_stream_gpu();
             }
-            // After processing function returns, the stream might be closed or encountered an error.
-            // The loop will re-evaluate the status_ at the beginning.
+
             PLOGI.printf("StreamNode [%s]: Stream processing finished or stopped (Status: %d).",
                    name_.c_str(), static_cast<int>(status_));
 
             if (status_ == StreamStatus::CLOSED || status_ == StreamStatus::ERROR) 
             {
-                close_stream(); // Ensure resources are released if processing stopped abnormally
+                close_stream();
                 PLOGI.printf("StreamNode [%s]: Stream closed or errored. Will attempt reconnection if running.", name_.c_str());
-                  // Optional short delay even after normal close before retry?
-                  // std::this_thread::sleep_for(std::chrono::milliseconds(100));
             }
         }
         else 
         {
-            // Should not happen if open_stream logic is correct, but good for debugging
             PLOGD.printf("StreamNode [%s]: Unexpected status %d in work loop.", name_.c_str(), static_cast<int>(status_));
             std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid tight loop on unexpected state
         }
 
-    } // End while(running_)
+    }
 
     PLOGI.printf("StreamNode [%s] work loop finished.", name_.c_str());
-    close_stream(); // Final cleanup
+    close_stream();
 }
-
-
-// --- Stream Processing Methods ---
+-
 
 void StreamNode::process_stream_cpu()
 {
@@ -174,15 +153,15 @@ void StreamNode::process_stream_cpu()
              success = cap_->read(frame);
         } catch (const cv::Exception& ex) {
             PLOGE.printf("StreamNode [%s] Error: Exception during cv::VideoCapture::read(): %s", name_.c_str(), ex.what());
-            status_ = StreamStatus::ERROR; // Treat OpenCV exception as an error
-            break; // Exit processing loop
+            status_ = StreamStatus::ERROR;
+            break;
         }
 
         if (!success || frame.empty())
         {
             PLOGE.printf("StreamNode [%s]: Cannot read frame (End of stream or error).", name_.c_str());
-            status_ = StreamStatus::CLOSED; // Assume normal closure or recoverable error
-            break; // Exit processing loop, work() will handle retry/stop
+            status_ = StreamStatus::CLOSED; 
+            break;
         }
 
         frame_count_++;
@@ -192,14 +171,13 @@ void StreamNode::process_stream_cpu()
         }
 
         auto metaData = std::make_shared<meta::MetaData>();
-        // Use clone() to ensure the pushed data is independent if frame is reused by VideoCapture
         metaData->image = frame.clone();
         metaData->from = name_;
-        // metaData->timestamp = getCurrentTimestamp(); // Add timestamp if useful
 
         for (auto& output_buffer : output_buffers_)
         {
-            if (output_buffer.second) { // Check if buffer pointer is valid
+            if (output_buffer.second) 
+            { 
                  output_buffer.second->push(metaData);
             }
         }
@@ -310,17 +288,12 @@ void StreamNode::process_stream_gpu()
                 continue; // Skip this decoded frame
             }
 
-            // Important: Create cv::Mat header WITHOUT copying data yet.
-            // The data pointer (frame_data) is owned by the decoder.
-            // We need to clone it if we push it asynchronously.
             cv::Mat frame_gpu(decoder_->get_height(), decoder_->get_width(), CV_8UC3, frame_data);
 
             // Create metadata and copy the frame data
             auto metaData = std::make_shared<meta::MetaData>();
             metaData->image = frame_gpu.clone(); // CLONE is crucial here!
             metaData->from = name_;
-            // metaData->timestamp = getCurrentTimestamp(); // Or use frame_pts if preferred
-            // metaData->frame_idx = frame_count_; // Add frame index if needed downstream
 
             bool pushed = false;
             for (auto& output_buffer : output_buffers_)
@@ -331,22 +304,13 @@ void StreamNode::process_stream_gpu()
                     pushed = true;
                 }
             }
-             // Optional: Log if data wasn't pushed anywhere
-            // if (!pushed) {
-            //     printf("StreamNode [%s]: Warning - GPU Frame %d processed but not pushed to any output buffer.\n", name_.c_str(), frame_count_);
-            // }
-
-            // Optional delay if needed for rate limiting, but usually not here
-            // std::this_thread::sleep_for(std::chrono::milliseconds(1));
-        } // End loop over decoded frames
-
-        // If status changed to ERROR within the frame loop, break the outer while loop
+        }
         if (status_ == StreamStatus::ERROR) 
         {
              break;
         }
 
-    }; // End while(running_ && status_ == StreamStatus::OPENED)
+    };
 
 	PLOGI.printf("StreamNode [%s]: Exiting GPU processing loop (Running: %s, Status: %d, Total frames processed this session: %d).",
            name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_), frame_count_ + 1);

+ 3 - 10
src/nodes/stream/streamNode.hpp

@@ -23,9 +23,9 @@ enum class DecodeType
 
 enum class StreamStatus{
     OPENED = 0,
-    CLOSED = 1,      // Indicates a potentially recoverable state (e.g., EOS, temporary network issue)
-    OPEN_FAILED = 2, // Indicates failure during the open attempt
-    ERROR = 3        // Indicates an unrecoverable error during processing
+    CLOSED = 1,
+    OPEN_FAILED = 2,
+    ERROR = 3
 };
 
 class StreamNode : public BaseNode
@@ -54,15 +54,8 @@ public:
 
     void set_stream_url(const std::string& stream_url)
     {
-        // Note: Changing URL while running might require more complex logic
-        // to stop, close, update, and restart. For simplicity, assume
-        // it's set before starting or requires manual stop/start.
         stream_url_ = stream_url;
         PLOGI.printf("StreamNode [%s] URL set to: %s", name_.c_str(), stream_url_.c_str());
-        // If called after initial failure, we might want to reset status
-        // if (status_ == StreamStatus::OPEN_FAILED) {
-        //     status_ = StreamStatus::CLOSED; // Allow work() to retry opening
-        // }
     }
 
     void set_skip_frame(int skip_frame)

+ 1 - 1
src/nodes/track/trackNode.hpp

@@ -23,7 +23,7 @@ public:
         track_label_ = track_label;
         frame_rate_ = frame_rate;
         track_buffer_ = track_buffer;
-        PLOGI.printf("TrackNode : [%s] Init. track label is %s, rate is %d, buffer is %d", name_.c_str(), track_label_.c_str(), frame_rate_, track_buffer_)
+        PLOGI.printf("TrackNode : [%s] Init. track label is %s, rate is %d, buffer is %d", name_.c_str(), track_label_.c_str(), frame_rate_, track_buffer_);
     }
     virtual ~TrackNode()  { stop(); };
     void work() override;