#ifndef STREAMNODE_HPP__ #define STREAMNODE_HPP__ #include "nodes/base/base.hpp" #include "opencv2/opencv.hpp" #include "stream/stream.hpp" #include #include // 日志库 #include "plog/Log.h" #include "plog/Initializers/RollingFileInitializer.h" namespace GNode { enum class DecodeType { CPU = 0, GPU = 1, FOLDER = 2 }; enum class StreamStatus{ OPENED = 0, CLOSED = 1, OPEN_FAILED = 2, ERROR = 3 }; class StreamNode : public BaseNode { public: StreamNode() = delete; // Constructor now only initializes members and tries initial open StreamNode(const std::string& name, const std::string& url, int gpu_id=0, DecodeType type=DecodeType::GPU, int retry_delay_ms = 5000) : BaseNode(name, NODE_TYPE::SRC_NODE), stream_url_(url), gpu_id_(gpu_id), decode_type_(type), retry_delay_ms_(retry_delay_ms) { PLOGI.printf("StreamNode [%s]: Created for URL: %s (Decode: %s)", name_.c_str(), stream_url_.c_str(), (decode_type_ == DecodeType::GPU ? "GPU" : "CPU")); open_stream(); } virtual ~StreamNode() { stop(); // Ensure stop is called (should set running_ to false) close_stream(); // Clean up resources PLOGI.printf("StreamNode [%s] destroyed.", name_.c_str()); }; void set_stream_url(const std::string& stream_url) { stream_url_ = stream_url; PLOGI.printf("StreamNode [%s] URL set to: %s", name_.c_str(), stream_url_.c_str()); } void set_skip_frame(int skip_frame) { if (skip_frame < 1) { skip_frame = 1; } PLOGI.printf("StreamNode [%s] Skip frame set to: %d", name_.c_str(), skip_frame); skip_frame_ = skip_frame; } void set_retry_delay(int delay_ms) { retry_delay_ms_ = std::max(100, delay_ms); // Ensure a minimum delay PLOGI.printf("StreamNode [%s] Retry delay set to: %d ms", name_.c_str(), retry_delay_ms_); } StreamStatus get_status() const { return status_; } void work() override; // Main loop with reconnection logic private: bool open_stream(); // Attempts to open the stream (CPU or GPU) void close_stream(); // Closes the stream and resets resources void process_stream_cpu(); // Renamed processing function void process_stream_gpu(); // Renamed processing function void process_stream_folder(); std::string stream_url_; int skip_frame_ = 1; int frame_count_ = -1; // Reset when stream (re)opens int gpu_id_ = 0; int retry_delay_ms_; // Delay between reconnection attempts int fps_ = 20; std::shared_ptr cap_ = nullptr; std::shared_ptr demuxer_ = nullptr; std::shared_ptr decoder_ = nullptr; DecodeType decode_type_ = DecodeType::GPU; StreamStatus status_ = StreamStatus::CLOSED; // Initial state before first open attempt }; } // namespace GNode #endif // STREAMNODE_HPP__