streamNode.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. #include "nodes/stream/streamNode.hpp"
  2. #include "common/utils.hpp" // Assuming Timer might be here, if used
  3. #include <iostream> // For std::cerr
  4. #include <thread> // For std::this_thread::sleep_for
  5. #include <chrono> // For std::chrono::milliseconds
  6. namespace GNode
  7. {
  8. void StreamNode::close_stream() {
  9. PLOGI.printf("StreamNode [%s]: Closing stream...", name_.c_str());
  10. cap_.reset();
  11. decoder_.reset();
  12. demuxer_.reset();
  13. status_ = StreamStatus::CLOSED;
  14. frame_count_ = -1;
  15. }
  16. bool StreamNode::open_stream() {
  17. close_stream();
  18. PLOGI.printf("StreamNode [%s]: Attempting to open stream: %s", name_.c_str(), stream_url_.c_str());
  19. status_ = StreamStatus::CLOSED;
  20. if (decode_type_ == DecodeType::GPU)
  21. {
  22. demuxer_ = FFHDDemuxer::create_ffmpeg_demuxer(stream_url_);
  23. if (demuxer_ == nullptr)
  24. {
  25. PLOGE.printf("StreamNode [%s] Error: GPU demuxer creation failed for %s", name_.c_str(), stream_url_.c_str());
  26. status_ = StreamStatus::OPEN_FAILED;
  27. return false;
  28. }
  29. auto codec_id = demuxer_->get_video_codec();
  30. decoder_ = FFHDDecoder::create_cuvid_decoder(
  31. false, FFHDDecoder::ffmpeg2NvCodecId(codec_id), -1, gpu_id_, nullptr, nullptr, true
  32. );
  33. if (decoder_ == nullptr)
  34. {
  35. PLOGE.printf("StreamNode [%s] Error: GPU decoder creation failed for %s (Codec: %d)", name_.c_str(), stream_url_.c_str(), codec_id);
  36. demuxer_.reset(); // Clean up demuxer if decoder fails
  37. status_ = StreamStatus::OPEN_FAILED;
  38. return false;
  39. }
  40. printf("StreamNode [%s]: GPU Demuxer and Decoder created successfully.", name_.c_str());
  41. status_ = StreamStatus::OPENED;
  42. }
  43. else
  44. {
  45. cap_ = std::make_shared<cv::VideoCapture>();
  46. if (!cap_->open(stream_url_))
  47. {
  48. PLOGI.printf("StreamNode [%s] Error: CPU cv::VideoCapture failed to open %s", name_.c_str(), stream_url_.c_str());
  49. cap_.reset(); // Release the failed object
  50. status_ = StreamStatus::OPEN_FAILED;
  51. return false;
  52. }
  53. if (!cap_->isOpened()) // Double check
  54. {
  55. PLOGE.printf("StreamNode [%s] Error: CPU cv::VideoCapture not opened after call for %s", name_.c_str(), stream_url_.c_str());
  56. cap_.reset();
  57. status_ = StreamStatus::OPEN_FAILED;
  58. return false;
  59. }
  60. PLOGI.printf("StreamNode [%s]: CPU cv::VideoCapture opened successfully.", name_.c_str());
  61. status_ = StreamStatus::OPENED;
  62. }
  63. frame_count_ = -1;
  64. return true;
  65. }
  66. void StreamNode::work()
  67. {
  68. PLOGI.printf("StreamNode [%s] starting work loop. Decode type: %s",
  69. name_.c_str(), (decode_type_ == DecodeType::GPU ? "GPU" : "CPU"));
  70. while (running_)
  71. {
  72. if (status_ != StreamStatus::OPENED)
  73. {
  74. PLOGI.printf("StreamNode [%s]: Stream not open (Status: %d). Attempting to open...",
  75. name_.c_str(), static_cast<int>(status_));
  76. if (open_stream())
  77. {
  78. PLOGI.printf("StreamNode [%s]: Stream opened successfully.", name_.c_str());
  79. }
  80. else
  81. {
  82. PLOGI.printf("StreamNode [%s]: Failed to open stream. Retrying in %d ms...",
  83. name_.c_str(), retry_delay_ms_);
  84. status_ = StreamStatus::OPEN_FAILED; // Ensure status reflects failure
  85. auto wakeUpTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(retry_delay_ms_);
  86. while (running_ && std::chrono::steady_clock::now() < wakeUpTime) {
  87. std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep in smaller chunks
  88. }
  89. if (!running_) break;
  90. }
  91. }
  92. if (status_ == StreamStatus::OPENED)
  93. {
  94. PLOGI.printf("StreamNode [%s]: Starting stream processing...", name_.c_str());
  95. if (decode_type_ == DecodeType::CPU)
  96. {
  97. process_stream_cpu();
  98. }
  99. else
  100. {
  101. process_stream_gpu();
  102. }
  103. PLOGI.printf("StreamNode [%s]: Stream processing finished or stopped (Status: %d).",
  104. name_.c_str(), static_cast<int>(status_));
  105. if (status_ == StreamStatus::CLOSED || status_ == StreamStatus::ERROR)
  106. {
  107. close_stream();
  108. PLOGI.printf("StreamNode [%s]: Stream closed or errored. Will attempt reconnection if running.", name_.c_str());
  109. }
  110. }
  111. else
  112. {
  113. PLOGD.printf("StreamNode [%s]: Unexpected status %d in work loop.", name_.c_str(), static_cast<int>(status_));
  114. std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid tight loop on unexpected state
  115. }
  116. }
  117. PLOGI.printf("StreamNode [%s] work loop finished.", name_.c_str());
  118. close_stream();
  119. }
  120. void StreamNode::process_stream_cpu()
  121. {
  122. if (!cap_ || !cap_->isOpened()) {
  123. PLOGD.printf("StreamNode [%s] Error: process_stream_cpu called with closed/invalid VideoCapture.", name_.c_str());
  124. status_ = StreamStatus::ERROR; // Indicate an unexpected state
  125. return;
  126. }
  127. PLOGI.printf("StreamNode [%s]: Processing CPU stream...", name_.c_str());
  128. while (running_ && status_ == StreamStatus::OPENED)
  129. {
  130. cv::Mat frame;
  131. bool success = false;
  132. try {
  133. success = cap_->read(frame);
  134. } catch (const cv::Exception& ex) {
  135. PLOGE.printf("StreamNode [%s] Error: Exception during cv::VideoCapture::read(): %s", name_.c_str(), ex.what());
  136. status_ = StreamStatus::ERROR;
  137. break;
  138. }
  139. if (!success || frame.empty())
  140. {
  141. PLOGE.printf("StreamNode [%s]: Cannot read frame (End of stream or error).", name_.c_str());
  142. status_ = StreamStatus::CLOSED;
  143. break;
  144. }
  145. frame_count_++;
  146. if (frame_count_ % skip_frame_ != 0)
  147. {
  148. continue; // Skip frame
  149. }
  150. auto metaData = std::make_shared<meta::MetaData>();
  151. metaData->image = frame.clone();
  152. metaData->from = name_;
  153. send_output_data(metaData);
  154. }
  155. PLOGI.printf("StreamNode [%s]: Exiting CPU processing loop (Running: %s, Status: %d).",
  156. name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_));
  157. }
  158. void StreamNode::process_stream_gpu()
  159. {
  160. if (!demuxer_ || !decoder_) {
  161. PLOGE.printf("StreamNode [%s] Error: process_stream_gpu called with invalid demuxer/decoder.", name_.c_str());
  162. status_ = StreamStatus::ERROR;
  163. return;
  164. }
  165. printf("StreamNode [%s]: Processing GPU stream...", name_.c_str());
  166. uint8_t* packet_data = nullptr;
  167. int packet_size = 0;
  168. int64_t pts = 0;
  169. // Send extradata once (important for some codecs)
  170. demuxer_->get_extra_data(&packet_data, &packet_size);
  171. if (packet_size > 0) {
  172. PLOGI.printf("StreamNode [%s]: Sending %d bytes of extradata to decoder.", name_.c_str(), packet_size);
  173. decoder_->decode(packet_data, packet_size);
  174. } else {
  175. PLOGI.printf("StreamNode [%s]: No extradata found or needed.", name_.c_str());
  176. }
  177. unsigned int frame_index = 0; // Consider if this should be member if state needs preserving across reconnects
  178. while(running_ && status_ == StreamStatus::OPENED)
  179. {
  180. // Demux next packet
  181. bool demux_ok = false;
  182. try
  183. {
  184. demux_ok = demuxer_->demux(&packet_data, &packet_size, &pts);
  185. }
  186. catch (const std::exception& ex) { // Catch potential exceptions from demuxer implementation
  187. PLOGE.printf("StreamNode [%s] Error: Exception during demuxer_->demux(): %s", name_.c_str(), ex.what());
  188. status_ = StreamStatus::ERROR;
  189. break;
  190. }
  191. if (!demux_ok || packet_size <= 0 || !running_) // Check running_ again after potentially blocking demux call
  192. {
  193. PLOGI.printf("StreamNode [%s]: Demuxing finished or failed (packet_size: %d, running: %s).",
  194. name_.c_str(), packet_size, running_ ? "true":"false");
  195. status_ = StreamStatus::CLOSED; // Assume normal end or recoverable error
  196. break; // Exit processing loop
  197. }
  198. // Decode the packet
  199. int ndecoded_frame = 0;
  200. try
  201. {
  202. ndecoded_frame = decoder_->decode(packet_data, packet_size, pts);
  203. }
  204. catch (const std::exception& ex)
  205. {
  206. PLOGE.printf("StreamNode [%s] Error: Exception during decoder_->decode(): %s", name_.c_str(), ex.what());
  207. status_ = StreamStatus::ERROR;
  208. break;
  209. }
  210. if (ndecoded_frame < 0)
  211. {
  212. PLOGE.printf("StreamNode [%s] Error: Decoder returned error (%d).", name_.c_str(), ndecoded_frame);
  213. status_ = StreamStatus::ERROR; // Treat decoder error as critical
  214. break; // Exit processing loop
  215. }
  216. // Process decoded frames
  217. for(int i = 0; i < ndecoded_frame; ++i)
  218. {
  219. // Timer timer("StreamNode GPU Frame"); // If using Timer utility
  220. uint8_t* frame_data = nullptr;
  221. int64_t frame_pts = 0;
  222. try
  223. {
  224. // Pass pointers to get the actual index and PTS for the current frame
  225. frame_data = decoder_->get_frame(&frame_pts, &frame_index);
  226. }
  227. catch (const std::exception& ex)
  228. {
  229. PLOGE.printf("StreamNode [%s] Error: Exception during decoder_->get_frame(): %s", name_.c_str(), ex.what());
  230. status_ = StreamStatus::ERROR;
  231. ndecoded_frame = 0; // Stop processing frames from this packet
  232. break; // Break inner frame loop
  233. }
  234. if (!frame_data)
  235. {
  236. PLOGE.printf("StreamNode [%s] Error: Decoder returned null frame data for frame %d.", name_.c_str(), i);
  237. status_ = StreamStatus::ERROR; // Treat null frame data as error
  238. ndecoded_frame = 0; // Stop processing frames from this packet
  239. break; // Break inner frame loop
  240. }
  241. // Update overall frame count and check skip logic
  242. frame_count_++;
  243. if (frame_count_ % skip_frame_ != 0)
  244. {
  245. continue; // Skip this decoded frame
  246. }
  247. cv::Mat frame_gpu(decoder_->get_height(), decoder_->get_width(), CV_8UC3, frame_data);
  248. // Create metadata and copy the frame data
  249. auto metaData = std::make_shared<meta::MetaData>();
  250. metaData->image = frame_gpu.clone(); // CLONE is crucial here!
  251. metaData->from = name_;
  252. send_output_data(metaData);
  253. }
  254. if (status_ == StreamStatus::ERROR)
  255. {
  256. break;
  257. }
  258. };
  259. PLOGI.printf("StreamNode [%s]: Exiting GPU processing loop (Running: %s, Status: %d, Total frames processed this session: %d).",
  260. name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_), frame_count_ + 1);
  261. }
  262. } // namespace GNode