|
@@ -10,7 +10,7 @@ namespace GNode
|
|
// --- Private Helper Methods ---
|
|
// --- Private Helper Methods ---
|
|
|
|
|
|
void StreamNode::close_stream() {
|
|
void StreamNode::close_stream() {
|
|
- printf("StreamNode [%s]: Closing stream...\n", name_.c_str());
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Closing stream...\n", name_.c_str());
|
|
// Reset pointers, which will call destructors if they are unique owners
|
|
// Reset pointers, which will call destructors if they are unique owners
|
|
cap_.reset();
|
|
cap_.reset();
|
|
decoder_.reset(); // Decoder depends on demuxer info, close it first potentially
|
|
decoder_.reset(); // Decoder depends on demuxer info, close it first potentially
|
|
@@ -23,7 +23,7 @@ bool StreamNode::open_stream() {
|
|
// Ensure any previous stream is closed before opening a new one
|
|
// Ensure any previous stream is closed before opening a new one
|
|
close_stream();
|
|
close_stream();
|
|
|
|
|
|
- printf("StreamNode [%s]: Attempting to open stream: %s\n", name_.c_str(), stream_url_.c_str());
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Attempting to open stream: %s\n", name_.c_str(), stream_url_.c_str());
|
|
status_ = StreamStatus::CLOSED; // Start as closed before trying
|
|
status_ = StreamStatus::CLOSED; // Start as closed before trying
|
|
|
|
|
|
if (decode_type_ == DecodeType::GPU)
|
|
if (decode_type_ == DecodeType::GPU)
|
|
@@ -31,7 +31,7 @@ bool StreamNode::open_stream() {
|
|
demuxer_ = FFHDDemuxer::create_ffmpeg_demuxer(stream_url_);
|
|
demuxer_ = FFHDDemuxer::create_ffmpeg_demuxer(stream_url_);
|
|
if (demuxer_ == nullptr)
|
|
if (demuxer_ == nullptr)
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s] Error: GPU demuxer creation failed for %s\n", name_.c_str(), stream_url_.c_str());
|
|
|
|
|
|
+ PLOGI.fprintf(stderr, "StreamNode [%s] Error: GPU demuxer creation failed for %s\n", name_.c_str(), stream_url_.c_str());
|
|
status_ = StreamStatus::OPEN_FAILED;
|
|
status_ = StreamStatus::OPEN_FAILED;
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
@@ -44,7 +44,7 @@ bool StreamNode::open_stream() {
|
|
|
|
|
|
if (decoder_ == nullptr)
|
|
if (decoder_ == nullptr)
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s] Error: GPU decoder creation failed for %s (Codec: %d)\n", name_.c_str(), stream_url_.c_str(), codec_id);
|
|
|
|
|
|
+ PLOGI.fprintf(stderr, "StreamNode [%s] Error: GPU decoder creation failed for %s (Codec: %d)\n", name_.c_str(), stream_url_.c_str(), codec_id);
|
|
demuxer_.reset(); // Clean up demuxer if decoder fails
|
|
demuxer_.reset(); // Clean up demuxer if decoder fails
|
|
status_ = StreamStatus::OPEN_FAILED;
|
|
status_ = StreamStatus::OPEN_FAILED;
|
|
return false;
|
|
return false;
|
|
@@ -59,19 +59,19 @@ bool StreamNode::open_stream() {
|
|
// cap_->open(stream_url_, cv::CAP_FFMPEG);
|
|
// cap_->open(stream_url_, cv::CAP_FFMPEG);
|
|
if (!cap_->open(stream_url_)) // Check return value of open
|
|
if (!cap_->open(stream_url_)) // Check return value of open
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s] Error: CPU cv::VideoCapture failed to open %s\n", name_.c_str(), stream_url_.c_str());
|
|
|
|
|
|
+ PLOGI.fprintf(stderr, "StreamNode [%s] Error: CPU cv::VideoCapture failed to open %s\n", name_.c_str(), stream_url_.c_str());
|
|
cap_.reset(); // Release the failed object
|
|
cap_.reset(); // Release the failed object
|
|
status_ = StreamStatus::OPEN_FAILED;
|
|
status_ = StreamStatus::OPEN_FAILED;
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
if (!cap_->isOpened()) // Double check
|
|
if (!cap_->isOpened()) // Double check
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s] Error: CPU cv::VideoCapture not opened after call for %s\n", name_.c_str(), stream_url_.c_str());
|
|
|
|
- cap_.reset();
|
|
|
|
- status_ = StreamStatus::OPEN_FAILED;
|
|
|
|
- return false;
|
|
|
|
|
|
+ PLOGI.fprintf(stderr, "StreamNode [%s] Error: CPU cv::VideoCapture not opened after call for %s\n", name_.c_str(), stream_url_.c_str());
|
|
|
|
+ cap_.reset();
|
|
|
|
+ status_ = StreamStatus::OPEN_FAILED;
|
|
|
|
+ return false;
|
|
}
|
|
}
|
|
- printf("StreamNode [%s]: CPU cv::VideoCapture opened successfully.\n", name_.c_str());
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: CPU cv::VideoCapture opened successfully.\n", name_.c_str());
|
|
status_ = StreamStatus::OPENED;
|
|
status_ = StreamStatus::OPENED;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -84,24 +84,24 @@ bool StreamNode::open_stream() {
|
|
|
|
|
|
void StreamNode::work()
|
|
void StreamNode::work()
|
|
{
|
|
{
|
|
- printf("StreamNode [%s] starting work loop. Decode type: %s\n",
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s] starting work loop. Decode type: %s\n",
|
|
name_.c_str(), (decode_type_ == DecodeType::GPU ? "GPU" : "CPU"));
|
|
name_.c_str(), (decode_type_ == DecodeType::GPU ? "GPU" : "CPU"));
|
|
while (running_) // Main loop continues as long as the node is supposed to run
|
|
while (running_) // Main loop continues as long as the node is supposed to run
|
|
{
|
|
{
|
|
if (status_ != StreamStatus::OPENED) // Check if stream needs opening/reopening
|
|
if (status_ != StreamStatus::OPENED) // Check if stream needs opening/reopening
|
|
{
|
|
{
|
|
- printf("StreamNode [%s]: Stream not open (Status: %d). Attempting to open...\n",
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Stream not open (Status: %d). Attempting to open...\n",
|
|
name_.c_str(), static_cast<int>(status_));
|
|
name_.c_str(), static_cast<int>(status_));
|
|
|
|
|
|
if (open_stream()) // Try to open
|
|
if (open_stream()) // Try to open
|
|
{
|
|
{
|
|
- printf("StreamNode [%s]: Stream opened successfully.\n", name_.c_str());
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Stream opened successfully.\n", name_.c_str());
|
|
// Continue to processing immediately after successful open
|
|
// Continue to processing immediately after successful open
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
// Opening failed, wait before retrying
|
|
// Opening failed, wait before retrying
|
|
- fprintf(stderr, "StreamNode [%s]: Failed to open stream. Retrying in %d ms...\n",
|
|
|
|
|
|
+ PLOGI.fprintf(stderr, "StreamNode [%s]: Failed to open stream. Retrying in %d ms...\n",
|
|
name_.c_str(), retry_delay_ms_);
|
|
name_.c_str(), retry_delay_ms_);
|
|
status_ = StreamStatus::OPEN_FAILED; // Ensure status reflects failure
|
|
status_ = StreamStatus::OPEN_FAILED; // Ensure status reflects failure
|
|
|
|
|
|
@@ -119,7 +119,7 @@ void StreamNode::work()
|
|
// If we reach here, the stream should be OPENED
|
|
// If we reach here, the stream should be OPENED
|
|
if (status_ == StreamStatus::OPENED)
|
|
if (status_ == StreamStatus::OPENED)
|
|
{
|
|
{
|
|
- printf("StreamNode [%s]: Starting stream processing...\n", name_.c_str());
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Starting stream processing...\n", name_.c_str());
|
|
if (decode_type_ == DecodeType::CPU)
|
|
if (decode_type_ == DecodeType::CPU)
|
|
{
|
|
{
|
|
process_stream_cpu();
|
|
process_stream_cpu();
|
|
@@ -130,13 +130,13 @@ void StreamNode::work()
|
|
}
|
|
}
|
|
// After processing function returns, the stream might be closed or encountered an error.
|
|
// After processing function returns, the stream might be closed or encountered an error.
|
|
// The loop will re-evaluate the status_ at the beginning.
|
|
// The loop will re-evaluate the status_ at the beginning.
|
|
- printf("StreamNode [%s]: Stream processing finished or stopped (Status: %d).\n",
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Stream processing finished or stopped (Status: %d).\n",
|
|
name_.c_str(), static_cast<int>(status_));
|
|
name_.c_str(), static_cast<int>(status_));
|
|
|
|
|
|
if (status_ == StreamStatus::CLOSED || status_ == StreamStatus::ERROR)
|
|
if (status_ == StreamStatus::CLOSED || status_ == StreamStatus::ERROR)
|
|
{
|
|
{
|
|
- close_stream(); // Ensure resources are released if processing stopped abnormally
|
|
|
|
- printf("StreamNode [%s]: Stream closed or errored. Will attempt reconnection if running.\n", name_.c_str());
|
|
|
|
|
|
+ close_stream(); // Ensure resources are released if processing stopped abnormally
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Stream closed or errored. Will attempt reconnection if running.\n", name_.c_str());
|
|
// Optional short delay even after normal close before retry?
|
|
// Optional short delay even after normal close before retry?
|
|
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
}
|
|
}
|
|
@@ -144,13 +144,13 @@ void StreamNode::work()
|
|
else
|
|
else
|
|
{
|
|
{
|
|
// Should not happen if open_stream logic is correct, but good for debugging
|
|
// Should not happen if open_stream logic is correct, but good for debugging
|
|
- fprintf(stderr, "StreamNode [%s]: Unexpected status %d in work loop.\n", name_.c_str(), static_cast<int>(status_));
|
|
|
|
- std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid tight loop on unexpected state
|
|
|
|
|
|
+ PLOGD.printf("StreamNode [%s]: Unexpected status %d in work loop.\n", name_.c_str(), static_cast<int>(status_));
|
|
|
|
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid tight loop on unexpected state
|
|
}
|
|
}
|
|
|
|
|
|
} // End while(running_)
|
|
} // End while(running_)
|
|
|
|
|
|
- printf("StreamNode [%s] work loop finished.\n", name_.c_str());
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s] work loop finished.\n", name_.c_str());
|
|
close_stream(); // Final cleanup
|
|
close_stream(); // Final cleanup
|
|
}
|
|
}
|
|
|
|
|
|
@@ -160,12 +160,12 @@ void StreamNode::work()
|
|
void StreamNode::process_stream_cpu()
|
|
void StreamNode::process_stream_cpu()
|
|
{
|
|
{
|
|
if (!cap_ || !cap_->isOpened()) {
|
|
if (!cap_ || !cap_->isOpened()) {
|
|
- fprintf(stderr, "StreamNode [%s] Error: process_stream_cpu called with closed/invalid VideoCapture.\n", name_.c_str());
|
|
|
|
|
|
+ PLOGD.printf("StreamNode [%s] Error: process_stream_cpu called with closed/invalid VideoCapture.\n", name_.c_str());
|
|
status_ = StreamStatus::ERROR; // Indicate an unexpected state
|
|
status_ = StreamStatus::ERROR; // Indicate an unexpected state
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- printf("StreamNode [%s]: Processing CPU stream...\n", name_.c_str());
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Processing CPU stream...\n", name_.c_str());
|
|
while (running_ && status_ == StreamStatus::OPENED)
|
|
while (running_ && status_ == StreamStatus::OPENED)
|
|
{
|
|
{
|
|
cv::Mat frame;
|
|
cv::Mat frame;
|
|
@@ -173,14 +173,14 @@ void StreamNode::process_stream_cpu()
|
|
try {
|
|
try {
|
|
success = cap_->read(frame);
|
|
success = cap_->read(frame);
|
|
} catch (const cv::Exception& ex) {
|
|
} catch (const cv::Exception& ex) {
|
|
- fprintf(stderr, "StreamNode [%s] Error: Exception during cv::VideoCapture::read(): %s\n", name_.c_str(), ex.what());
|
|
|
|
|
|
+ PLOGE.printf("StreamNode [%s] Error: Exception during cv::VideoCapture::read(): %s\n", name_.c_str(), ex.what());
|
|
status_ = StreamStatus::ERROR; // Treat OpenCV exception as an error
|
|
status_ = StreamStatus::ERROR; // Treat OpenCV exception as an error
|
|
break; // Exit processing loop
|
|
break; // Exit processing loop
|
|
}
|
|
}
|
|
|
|
|
|
if (!success || frame.empty())
|
|
if (!success || frame.empty())
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s]: Cannot read frame (End of stream or error).\n", name_.c_str());
|
|
|
|
|
|
+ PLOGE.printf("StreamNode [%s]: Cannot read frame (End of stream or error).\n", name_.c_str());
|
|
status_ = StreamStatus::CLOSED; // Assume normal closure or recoverable error
|
|
status_ = StreamStatus::CLOSED; // Assume normal closure or recoverable error
|
|
break; // Exit processing loop, work() will handle retry/stop
|
|
break; // Exit processing loop, work() will handle retry/stop
|
|
}
|
|
}
|
|
@@ -204,14 +204,14 @@ void StreamNode::process_stream_cpu()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- printf("StreamNode [%s]: Exiting CPU processing loop (Running: %s, Status: %d).\n",
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Exiting CPU processing loop (Running: %s, Status: %d).\n",
|
|
name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_));
|
|
name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_));
|
|
}
|
|
}
|
|
|
|
|
|
void StreamNode::process_stream_gpu()
|
|
void StreamNode::process_stream_gpu()
|
|
{
|
|
{
|
|
if (!demuxer_ || !decoder_) {
|
|
if (!demuxer_ || !decoder_) {
|
|
- fprintf(stderr, "StreamNode [%s] Error: process_stream_gpu called with invalid demuxer/decoder.\n", name_.c_str());
|
|
|
|
|
|
+ PLOGE.printf("StreamNode [%s] Error: process_stream_gpu called with invalid demuxer/decoder.\n", name_.c_str());
|
|
status_ = StreamStatus::ERROR;
|
|
status_ = StreamStatus::ERROR;
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
@@ -224,10 +224,10 @@ void StreamNode::process_stream_gpu()
|
|
// Send extradata once (important for some codecs)
|
|
// Send extradata once (important for some codecs)
|
|
demuxer_->get_extra_data(&packet_data, &packet_size);
|
|
demuxer_->get_extra_data(&packet_data, &packet_size);
|
|
if (packet_size > 0) {
|
|
if (packet_size > 0) {
|
|
- printf("StreamNode [%s]: Sending %d bytes of extradata to decoder.\n", name_.c_str(), packet_size);
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Sending %d bytes of extradata to decoder.\n", name_.c_str(), packet_size);
|
|
decoder_->decode(packet_data, packet_size);
|
|
decoder_->decode(packet_data, packet_size);
|
|
} else {
|
|
} else {
|
|
- printf("StreamNode [%s]: No extradata found or needed.\n", name_.c_str());
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: No extradata found or needed.\n", name_.c_str());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -236,17 +236,19 @@ void StreamNode::process_stream_gpu()
|
|
{
|
|
{
|
|
// Demux next packet
|
|
// Demux next packet
|
|
bool demux_ok = false;
|
|
bool demux_ok = false;
|
|
- try {
|
|
|
|
|
|
+ try
|
|
|
|
+ {
|
|
demux_ok = demuxer_->demux(&packet_data, &packet_size, &pts);
|
|
demux_ok = demuxer_->demux(&packet_data, &packet_size, &pts);
|
|
- } catch (const std::exception& ex) { // Catch potential exceptions from demuxer implementation
|
|
|
|
- fprintf(stderr, "StreamNode [%s] Error: Exception during demuxer_->demux(): %s\n", name_.c_str(), ex.what());
|
|
|
|
- status_ = StreamStatus::ERROR;
|
|
|
|
- break;
|
|
|
|
|
|
+ }
|
|
|
|
+ catch (const std::exception& ex) { // Catch potential exceptions from demuxer implementation
|
|
|
|
+ PLOGE.printf(stderr, "StreamNode [%s] Error: Exception during demuxer_->demux(): %s\n", name_.c_str(), ex.what());
|
|
|
|
+ status_ = StreamStatus::ERROR;
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
|
|
|
|
if (!demux_ok || packet_size <= 0 || !running_) // Check running_ again after potentially blocking demux call
|
|
if (!demux_ok || packet_size <= 0 || !running_) // Check running_ again after potentially blocking demux call
|
|
{
|
|
{
|
|
- printf("StreamNode [%s]: Demuxing finished or failed (packet_size: %d, running: %s).\n",
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Demuxing finished or failed (packet_size: %d, running: %s).\n",
|
|
name_.c_str(), packet_size, running_ ? "true":"false");
|
|
name_.c_str(), packet_size, running_ ? "true":"false");
|
|
status_ = StreamStatus::CLOSED; // Assume normal end or recoverable error
|
|
status_ = StreamStatus::CLOSED; // Assume normal end or recoverable error
|
|
break; // Exit processing loop
|
|
break; // Exit processing loop
|
|
@@ -260,14 +262,14 @@ void StreamNode::process_stream_gpu()
|
|
}
|
|
}
|
|
catch (const std::exception& ex)
|
|
catch (const std::exception& ex)
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s] Error: Exception during decoder_->decode(): %s\n", name_.c_str(), ex.what());
|
|
|
|
- status_ = StreamStatus::ERROR;
|
|
|
|
- break;
|
|
|
|
|
|
+ PLOGE.printf("StreamNode [%s] Error: Exception during decoder_->decode(): %s\n", name_.c_str(), ex.what());
|
|
|
|
+ status_ = StreamStatus::ERROR;
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
|
|
|
|
if (ndecoded_frame < 0)
|
|
if (ndecoded_frame < 0)
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s] Error: Decoder returned error (%d).\n", name_.c_str(), ndecoded_frame);
|
|
|
|
|
|
+ PLOGE.printf("StreamNode [%s] Error: Decoder returned error (%d).\n", name_.c_str(), ndecoded_frame);
|
|
status_ = StreamStatus::ERROR; // Treat decoder error as critical
|
|
status_ = StreamStatus::ERROR; // Treat decoder error as critical
|
|
break; // Exit processing loop
|
|
break; // Exit processing loop
|
|
}
|
|
}
|
|
@@ -286,15 +288,15 @@ void StreamNode::process_stream_gpu()
|
|
}
|
|
}
|
|
catch (const std::exception& ex)
|
|
catch (const std::exception& ex)
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s] Error: Exception during decoder_->get_frame(): %s\n", name_.c_str(), ex.what());
|
|
|
|
- status_ = StreamStatus::ERROR;
|
|
|
|
- ndecoded_frame = 0; // Stop processing frames from this packet
|
|
|
|
- break; // Break inner frame loop
|
|
|
|
|
|
+ PLOGE.printf("StreamNode [%s] Error: Exception during decoder_->get_frame(): %s\n", name_.c_str(), ex.what());
|
|
|
|
+ status_ = StreamStatus::ERROR;
|
|
|
|
+ ndecoded_frame = 0; // Stop processing frames from this packet
|
|
|
|
+ break; // Break inner frame loop
|
|
}
|
|
}
|
|
|
|
|
|
if (!frame_data)
|
|
if (!frame_data)
|
|
{
|
|
{
|
|
- fprintf(stderr, "StreamNode [%s] Error: Decoder returned null frame data for frame %d.\n", name_.c_str(), i);
|
|
|
|
|
|
+ PLOGE.printf("StreamNode [%s] Error: Decoder returned null frame data for frame %d.\n", name_.c_str(), i);
|
|
status_ = StreamStatus::ERROR; // Treat null frame data as error
|
|
status_ = StreamStatus::ERROR; // Treat null frame data as error
|
|
ndecoded_frame = 0; // Stop processing frames from this packet
|
|
ndecoded_frame = 0; // Stop processing frames from this packet
|
|
break; // Break inner frame loop
|
|
break; // Break inner frame loop
|
|
@@ -346,7 +348,7 @@ void StreamNode::process_stream_gpu()
|
|
|
|
|
|
}; // End while(running_ && status_ == StreamStatus::OPENED)
|
|
}; // End while(running_ && status_ == StreamStatus::OPENED)
|
|
|
|
|
|
- printf("StreamNode [%s]: Exiting GPU processing loop (Running: %s, Status: %d, Total frames processed this session: %d).\n",
|
|
|
|
|
|
+ PLOGI.printf("StreamNode [%s]: Exiting GPU processing loop (Running: %s, Status: %d, Total frames processed this session: %d).\n",
|
|
name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_), frame_count_ + 1);
|
|
name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_), frame_count_ + 1);
|
|
}
|
|
}
|
|
|
|
|