123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- #include "nodes/stream/streamNode.hpp"
- #include "common/utils.hpp" // Assuming Timer might be here, if used
- #include <iostream> // For std::cerr
- #include <thread> // For std::this_thread::sleep_for
- #include <chrono> // For std::chrono::milliseconds
- namespace GNode
- {
- void StreamNode::close_stream() {
- PLOGI.printf("StreamNode [%s]: Closing stream...", name_.c_str());
- cap_.reset();
- decoder_.reset();
- demuxer_.reset();
- status_ = StreamStatus::CLOSED;
- frame_count_ = -1;
- }
- bool StreamNode::open_stream() {
- close_stream();
- PLOGI.printf("StreamNode [%s]: Attempting to open stream: %s", name_.c_str(), stream_url_.c_str());
- status_ = StreamStatus::CLOSED;
- if (decode_type_ == DecodeType::GPU)
- {
- demuxer_ = FFHDDemuxer::create_ffmpeg_demuxer(stream_url_);
- if (demuxer_ == nullptr)
- {
- PLOGE.printf("StreamNode [%s] Error: GPU demuxer creation failed for %s", name_.c_str(), stream_url_.c_str());
- status_ = StreamStatus::OPEN_FAILED;
- return false;
- }
- auto codec_id = demuxer_->get_video_codec();
- decoder_ = FFHDDecoder::create_cuvid_decoder(
- false, FFHDDecoder::ffmpeg2NvCodecId(codec_id), -1, gpu_id_, nullptr, nullptr, true
- );
- if (decoder_ == nullptr)
- {
- PLOGE.printf("StreamNode [%s] Error: GPU decoder creation failed for %s (Codec: %d)", name_.c_str(), stream_url_.c_str(), codec_id);
- demuxer_.reset(); // Clean up demuxer if decoder fails
- status_ = StreamStatus::OPEN_FAILED;
- return false;
- }
- printf("StreamNode [%s]: GPU Demuxer and Decoder created successfully.", name_.c_str());
- status_ = StreamStatus::OPENED;
- }
- else
- {
- cap_ = std::make_shared<cv::VideoCapture>();
- if (!cap_->open(stream_url_))
- {
- PLOGI.printf("StreamNode [%s] Error: CPU cv::VideoCapture failed to open %s", name_.c_str(), stream_url_.c_str());
- cap_.reset(); // Release the failed object
- status_ = StreamStatus::OPEN_FAILED;
- return false;
- }
- if (!cap_->isOpened()) // Double check
- {
- PLOGE.printf("StreamNode [%s] Error: CPU cv::VideoCapture not opened after call for %s", name_.c_str(), stream_url_.c_str());
- cap_.reset();
- status_ = StreamStatus::OPEN_FAILED;
- return false;
- }
- PLOGI.printf("StreamNode [%s]: CPU cv::VideoCapture opened successfully.", name_.c_str());
- status_ = StreamStatus::OPENED;
- }
- frame_count_ = -1;
- return true;
- }
- void StreamNode::work()
- {
- PLOGI.printf("StreamNode [%s] starting work loop. Decode type: %s",
- name_.c_str(), (decode_type_ == DecodeType::GPU ? "GPU" : "CPU"));
- while (running_)
- {
- if (status_ != StreamStatus::OPENED)
- {
- PLOGI.printf("StreamNode [%s]: Stream not open (Status: %d). Attempting to open...",
- name_.c_str(), static_cast<int>(status_));
- if (open_stream())
- {
- PLOGI.printf("StreamNode [%s]: Stream opened successfully.", name_.c_str());
- }
- else
- {
- PLOGI.printf("StreamNode [%s]: Failed to open stream. Retrying in %d ms...",
- name_.c_str(), retry_delay_ms_);
- status_ = StreamStatus::OPEN_FAILED; // Ensure status reflects failure
- auto wakeUpTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(retry_delay_ms_);
- while (running_ && std::chrono::steady_clock::now() < wakeUpTime) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep in smaller chunks
- }
- if (!running_) break;
- }
- }
- if (status_ == StreamStatus::OPENED)
- {
- PLOGI.printf("StreamNode [%s]: Starting stream processing...", name_.c_str());
- if (decode_type_ == DecodeType::CPU)
- {
- process_stream_cpu();
- }
- else
- {
- process_stream_gpu();
- }
- PLOGI.printf("StreamNode [%s]: Stream processing finished or stopped (Status: %d).",
- name_.c_str(), static_cast<int>(status_));
- if (status_ == StreamStatus::CLOSED || status_ == StreamStatus::ERROR)
- {
- close_stream();
- PLOGI.printf("StreamNode [%s]: Stream closed or errored. Will attempt reconnection if running.", name_.c_str());
- }
- }
- else
- {
- PLOGD.printf("StreamNode [%s]: Unexpected status %d in work loop.", name_.c_str(), static_cast<int>(status_));
- std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid tight loop on unexpected state
- }
- }
- PLOGI.printf("StreamNode [%s] work loop finished.", name_.c_str());
- close_stream();
- }
- void StreamNode::process_stream_cpu()
- {
- if (!cap_ || !cap_->isOpened()) {
- PLOGD.printf("StreamNode [%s] Error: process_stream_cpu called with closed/invalid VideoCapture.", name_.c_str());
- status_ = StreamStatus::ERROR; // Indicate an unexpected state
- return;
- }
- PLOGI.printf("StreamNode [%s]: Processing CPU stream...", name_.c_str());
- while (running_ && status_ == StreamStatus::OPENED)
- {
- cv::Mat frame;
- bool success = false;
- try {
- success = cap_->read(frame);
- } catch (const cv::Exception& ex) {
- PLOGE.printf("StreamNode [%s] Error: Exception during cv::VideoCapture::read(): %s", name_.c_str(), ex.what());
- status_ = StreamStatus::ERROR;
- break;
- }
- if (!success || frame.empty())
- {
- PLOGE.printf("StreamNode [%s]: Cannot read frame (End of stream or error).", name_.c_str());
- status_ = StreamStatus::CLOSED;
- break;
- }
- frame_count_++;
- if (frame_count_ % skip_frame_ != 0)
- {
- continue; // Skip frame
- }
- auto metaData = std::make_shared<meta::MetaData>();
- metaData->image = frame.clone();
- metaData->from = name_;
- send_output_data(metaData);
- }
- PLOGI.printf("StreamNode [%s]: Exiting CPU processing loop (Running: %s, Status: %d).",
- name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_));
- }
- void StreamNode::process_stream_gpu()
- {
- if (!demuxer_ || !decoder_) {
- PLOGE.printf("StreamNode [%s] Error: process_stream_gpu called with invalid demuxer/decoder.", name_.c_str());
- status_ = StreamStatus::ERROR;
- return;
- }
- printf("StreamNode [%s]: Processing GPU stream...", name_.c_str());
- uint8_t* packet_data = nullptr;
- int packet_size = 0;
- int64_t pts = 0;
- // Send extradata once (important for some codecs)
- demuxer_->get_extra_data(&packet_data, &packet_size);
- if (packet_size > 0) {
- PLOGI.printf("StreamNode [%s]: Sending %d bytes of extradata to decoder.", name_.c_str(), packet_size);
- decoder_->decode(packet_data, packet_size);
- } else {
- PLOGI.printf("StreamNode [%s]: No extradata found or needed.", name_.c_str());
- }
- unsigned int frame_index = 0; // Consider if this should be member if state needs preserving across reconnects
- while(running_ && status_ == StreamStatus::OPENED)
- {
- // Demux next packet
- bool demux_ok = false;
- try
- {
- demux_ok = demuxer_->demux(&packet_data, &packet_size, &pts);
- }
- catch (const std::exception& ex) { // Catch potential exceptions from demuxer implementation
- PLOGE.printf("StreamNode [%s] Error: Exception during demuxer_->demux(): %s", name_.c_str(), ex.what());
- status_ = StreamStatus::ERROR;
- break;
- }
- if (!demux_ok || packet_size <= 0 || !running_) // Check running_ again after potentially blocking demux call
- {
- PLOGI.printf("StreamNode [%s]: Demuxing finished or failed (packet_size: %d, running: %s).",
- name_.c_str(), packet_size, running_ ? "true":"false");
- status_ = StreamStatus::CLOSED; // Assume normal end or recoverable error
- break; // Exit processing loop
- }
- // Decode the packet
- int ndecoded_frame = 0;
- try
- {
- ndecoded_frame = decoder_->decode(packet_data, packet_size, pts);
- }
- catch (const std::exception& ex)
- {
- PLOGE.printf("StreamNode [%s] Error: Exception during decoder_->decode(): %s", name_.c_str(), ex.what());
- status_ = StreamStatus::ERROR;
- break;
- }
- if (ndecoded_frame < 0)
- {
- PLOGE.printf("StreamNode [%s] Error: Decoder returned error (%d).", name_.c_str(), ndecoded_frame);
- status_ = StreamStatus::ERROR; // Treat decoder error as critical
- break; // Exit processing loop
- }
- // Process decoded frames
- for(int i = 0; i < ndecoded_frame; ++i)
- {
- // Timer timer("StreamNode GPU Frame"); // If using Timer utility
- uint8_t* frame_data = nullptr;
- int64_t frame_pts = 0;
- try
- {
- // Pass pointers to get the actual index and PTS for the current frame
- frame_data = decoder_->get_frame(&frame_pts, &frame_index);
- }
- catch (const std::exception& ex)
- {
- PLOGE.printf("StreamNode [%s] Error: Exception during decoder_->get_frame(): %s", name_.c_str(), ex.what());
- status_ = StreamStatus::ERROR;
- ndecoded_frame = 0; // Stop processing frames from this packet
- break; // Break inner frame loop
- }
- if (!frame_data)
- {
- PLOGE.printf("StreamNode [%s] Error: Decoder returned null frame data for frame %d.", name_.c_str(), i);
- status_ = StreamStatus::ERROR; // Treat null frame data as error
- ndecoded_frame = 0; // Stop processing frames from this packet
- break; // Break inner frame loop
- }
- // Update overall frame count and check skip logic
- frame_count_++;
- if (frame_count_ % skip_frame_ != 0)
- {
- continue; // Skip this decoded frame
- }
- cv::Mat frame_gpu(decoder_->get_height(), decoder_->get_width(), CV_8UC3, frame_data);
- // Create metadata and copy the frame data
- auto metaData = std::make_shared<meta::MetaData>();
- metaData->image = frame_gpu.clone(); // CLONE is crucial here!
- metaData->from = name_;
- send_output_data(metaData);
- }
- if (status_ == StreamStatus::ERROR)
- {
- break;
- }
- };
- PLOGI.printf("StreamNode [%s]: Exiting GPU processing loop (Running: %s, Status: %d, Total frames processed this session: %d).",
- name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_), frame_count_ + 1);
- }
- } // namespace GNode
|