streamNode.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. #include "nodes/stream/streamNode.hpp"
  2. #include "common/utils.hpp" // Assuming Timer might be here, if used
  3. #include <iostream> // For std::cerr
  4. #include <thread> // For std::this_thread::sleep_for
  5. #include <chrono> // For std::chrono::milliseconds
  6. #include <filesystem>
  7. namespace GNode
  8. {
  9. void StreamNode::close_stream() {
  10. PLOGI.printf("StreamNode [%s]: Closing stream...", name_.c_str());
  11. cap_.reset();
  12. decoder_.reset();
  13. demuxer_.reset();
  14. status_ = StreamStatus::CLOSED;
  15. frame_count_ = -1;
  16. }
  17. bool StreamNode::open_stream() {
  18. close_stream();
  19. PLOGI.printf("StreamNode [%s]: Attempting to open stream: %s", name_.c_str(), stream_url_.c_str());
  20. status_ = StreamStatus::CLOSED;
  21. if (decode_type_ == DecodeType::GPU)
  22. {
  23. demuxer_ = FFHDDemuxer::create_ffmpeg_demuxer(stream_url_);
  24. if (demuxer_ == nullptr)
  25. {
  26. PLOGE.printf("StreamNode [%s] Error: GPU demuxer creation failed for %s", name_.c_str(), stream_url_.c_str());
  27. status_ = StreamStatus::OPEN_FAILED;
  28. return false;
  29. }
  30. auto codec_id = demuxer_->get_video_codec();
  31. decoder_ = FFHDDecoder::create_cuvid_decoder(
  32. false, FFHDDecoder::ffmpeg2NvCodecId(codec_id), -1, gpu_id_, nullptr, nullptr, true
  33. );
  34. if (decoder_ == nullptr)
  35. {
  36. PLOGE.printf("StreamNode [%s] Error: GPU decoder creation failed for %s (Codec: %d)", name_.c_str(), stream_url_.c_str(), codec_id);
  37. demuxer_.reset(); // Clean up demuxer if decoder fails
  38. status_ = StreamStatus::OPEN_FAILED;
  39. return false;
  40. }
  41. PLOGI.printf("StreamNode [%s]: GPU Demuxer and Decoder created successfully.", name_.c_str());
  42. status_ = StreamStatus::OPENED;
  43. }
  44. else if (decode_type_ == DecodeType::CPU)
  45. {
  46. cap_ = std::make_shared<cv::VideoCapture>();
  47. if (!cap_->open(stream_url_))
  48. {
  49. PLOGI.printf("StreamNode [%s] Error: CPU cv::VideoCapture failed to open %s", name_.c_str(), stream_url_.c_str());
  50. cap_.reset(); // Release the failed object
  51. status_ = StreamStatus::OPEN_FAILED;
  52. return false;
  53. }
  54. if (!cap_->isOpened()) // Double check
  55. {
  56. PLOGE.printf("StreamNode [%s] Error: CPU cv::VideoCapture not opened after call for %s", name_.c_str(), stream_url_.c_str());
  57. cap_.reset();
  58. status_ = StreamStatus::OPEN_FAILED;
  59. return false;
  60. }
  61. PLOGI.printf("StreamNode [%s]: CPU cv::VideoCapture opened successfully.", name_.c_str());
  62. status_ = StreamStatus::OPENED;
  63. }
  64. else
  65. {
  66. status_ = StreamStatus::OPENED;
  67. return true;
  68. }
  69. frame_count_ = -1;
  70. return true;
  71. }
  72. void StreamNode::work()
  73. {
  74. PLOGI.printf("StreamNode [%s] starting work loop. Decode type: %s",
  75. name_.c_str(), (decode_type_ == DecodeType::GPU ? "GPU" : "CPU"));
  76. while (running_)
  77. {
  78. if (status_ != StreamStatus::OPENED)
  79. {
  80. PLOGI.printf("StreamNode [%s]: Stream not open (Status: %d). Attempting to open...",
  81. name_.c_str(), static_cast<int>(status_));
  82. if (open_stream())
  83. {
  84. PLOGI.printf("StreamNode [%s]: Stream opened successfully.", name_.c_str());
  85. }
  86. else
  87. {
  88. PLOGI.printf("StreamNode [%s]: Failed to open stream. Retrying in %d ms...",
  89. name_.c_str(), retry_delay_ms_);
  90. status_ = StreamStatus::OPEN_FAILED; // Ensure status reflects failure
  91. auto wakeUpTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(retry_delay_ms_);
  92. while (running_ && std::chrono::steady_clock::now() < wakeUpTime) {
  93. std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep in smaller chunks
  94. }
  95. if (!running_) break;
  96. }
  97. }
  98. if (status_ == StreamStatus::OPENED)
  99. {
  100. PLOGI.printf("StreamNode [%s]: Starting stream processing...", name_.c_str());
  101. if (decode_type_ == DecodeType::CPU)
  102. {
  103. process_stream_cpu();
  104. }
  105. else if (decode_type_ == DecodeType::GPU)
  106. {
  107. process_stream_gpu();
  108. }
  109. else
  110. {
  111. process_stream_folder();
  112. }
  113. PLOGI.printf("StreamNode [%s]: Stream processing finished or stopped (Status: %d).",
  114. name_.c_str(), static_cast<int>(status_));
  115. if (status_ == StreamStatus::CLOSED || status_ == StreamStatus::ERROR)
  116. {
  117. close_stream();
  118. PLOGI.printf("StreamNode [%s]: Stream closed or errored. Will attempt reconnection if running.", name_.c_str());
  119. }
  120. }
  121. else
  122. {
  123. PLOGD.printf("StreamNode [%s]: Unexpected status %d in work loop.", name_.c_str(), static_cast<int>(status_));
  124. std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid tight loop on unexpected state
  125. }
  126. }
  127. PLOGI.printf("StreamNode [%s] work loop finished.", name_.c_str());
  128. close_stream();
  129. }
  130. void StreamNode::process_stream_folder()
  131. {
  132. std::vector<std::string> files;
  133. if (!std::filesystem::exists(stream_url_) && !std::filesystem::is_directory(stream_url_))
  134. {
  135. running_.exchange(false);
  136. }
  137. else
  138. {
  139. for (const auto& entry : std::filesystem::directory_iterator(stream_url_))
  140. {
  141. if (entry.is_regular_file())
  142. {
  143. files.push_back(entry.path().string());
  144. }
  145. }
  146. }
  147. for (const auto& file : files)
  148. {
  149. cv::Mat frame;
  150. try
  151. {
  152. frame = cv::imread(file);
  153. }
  154. catch (const cv::Exception& ex)
  155. {
  156. PLOGE.printf("StreamNode [%s] Error: Exception during cv::imread: %s", name_.c_str(), ex.what());
  157. continue;
  158. }
  159. if (frame.empty())
  160. {
  161. continue;
  162. }
  163. frame_count_++;
  164. if (frame_count_ % skip_frame_ != 0)
  165. {
  166. continue; // Skip frame
  167. }
  168. auto metaData = std::make_shared<meta::MetaData>();
  169. metaData->image = frame.clone();
  170. metaData->from = name_;
  171. send_output_data(metaData);
  172. }
  173. PLOGI.printf("StreamNode [%s]: Exiting FOLDER processing loop (Running: %s, Status: %d).",
  174. name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_));
  175. }
  176. void StreamNode::process_stream_cpu()
  177. {
  178. if (!cap_ || !cap_->isOpened()) {
  179. PLOGD.printf("StreamNode [%s] Error: process_stream_cpu called with closed/invalid VideoCapture.", name_.c_str());
  180. status_ = StreamStatus::ERROR; // Indicate an unexpected state
  181. return;
  182. }
  183. PLOGI.printf("StreamNode [%s]: Processing CPU stream...", name_.c_str());
  184. while (running_ && status_ == StreamStatus::OPENED)
  185. {
  186. cv::Mat frame;
  187. bool success = false;
  188. try {
  189. success = cap_->read(frame);
  190. } catch (const cv::Exception& ex) {
  191. PLOGE.printf("StreamNode [%s] Error: Exception during cv::VideoCapture::read(): %s", name_.c_str(), ex.what());
  192. status_ = StreamStatus::ERROR;
  193. break;
  194. }
  195. if (!success || frame.empty())
  196. {
  197. PLOGE.printf("StreamNode [%s]: Cannot read frame (End of stream or error).", name_.c_str());
  198. status_ = StreamStatus::CLOSED;
  199. break;
  200. }
  201. frame_count_++;
  202. if (frame_count_ % skip_frame_ != 0)
  203. {
  204. continue; // Skip frame
  205. }
  206. auto metaData = std::make_shared<meta::MetaData>();
  207. metaData->image = frame.clone();
  208. metaData->from = name_;
  209. send_output_data(metaData);
  210. }
  211. PLOGI.printf("StreamNode [%s]: Exiting CPU processing loop (Running: %s, Status: %d).",
  212. name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_));
  213. }
  214. void StreamNode::process_stream_gpu()
  215. {
  216. if (!demuxer_ || !decoder_) {
  217. PLOGE.printf("StreamNode [%s] Error: process_stream_gpu called with invalid demuxer/decoder.", name_.c_str());
  218. status_ = StreamStatus::ERROR;
  219. return;
  220. }
  221. PLOGI.printf("StreamNode [%s]: Processing GPU stream...", name_.c_str());
  222. uint8_t* packet_data = nullptr;
  223. int packet_size = 0;
  224. int64_t pts = 0;
  225. // Send extradata once (important for some codecs)
  226. demuxer_->get_extra_data(&packet_data, &packet_size);
  227. if (packet_size > 0) {
  228. PLOGI.printf("StreamNode [%s]: Sending %d bytes of extradata to decoder.", name_.c_str(), packet_size);
  229. decoder_->decode(packet_data, packet_size);
  230. } else {
  231. PLOGI.printf("StreamNode [%s]: No extradata found or needed.", name_.c_str());
  232. }
  233. unsigned int frame_index = 0; // Consider if this should be member if state needs preserving across reconnects
  234. while(running_ && status_ == StreamStatus::OPENED)
  235. {
  236. // Demux next packet
  237. bool demux_ok = false;
  238. try
  239. {
  240. demux_ok = demuxer_->demux(&packet_data, &packet_size, &pts);
  241. }
  242. catch (const std::exception& ex) { // Catch potential exceptions from demuxer implementation
  243. PLOGE.printf("StreamNode [%s] Error: Exception during demuxer_->demux(): %s", name_.c_str(), ex.what());
  244. status_ = StreamStatus::ERROR;
  245. break;
  246. }
  247. if (!demux_ok || packet_size <= 0 || !running_) // Check running_ again after potentially blocking demux call
  248. {
  249. PLOGI.printf("StreamNode [%s]: Demuxing finished or failed (packet_size: %d, running: %s).",
  250. name_.c_str(), packet_size, running_ ? "true":"false");
  251. status_ = StreamStatus::CLOSED; // Assume normal end or recoverable error
  252. break; // Exit processing loop
  253. }
  254. // Decode the packet
  255. int ndecoded_frame = 0;
  256. try
  257. {
  258. ndecoded_frame = decoder_->decode(packet_data, packet_size, pts);
  259. }
  260. catch (const std::exception& ex)
  261. {
  262. PLOGE.printf("StreamNode [%s] Error: Exception during decoder_->decode(): %s", name_.c_str(), ex.what());
  263. status_ = StreamStatus::ERROR;
  264. break;
  265. }
  266. if (ndecoded_frame < 0)
  267. {
  268. PLOGE.printf("StreamNode [%s] Error: Decoder returned error (%d).", name_.c_str(), ndecoded_frame);
  269. status_ = StreamStatus::ERROR; // Treat decoder error as critical
  270. break; // Exit processing loop
  271. }
  272. // Process decoded frames
  273. for(int i = 0; i < ndecoded_frame; ++i)
  274. {
  275. // Timer timer("StreamNode GPU Frame"); // If using Timer utility
  276. uint8_t* frame_data = nullptr;
  277. int64_t frame_pts = 0;
  278. try
  279. {
  280. // Pass pointers to get the actual index and PTS for the current frame
  281. frame_data = decoder_->get_frame(&frame_pts, &frame_index);
  282. }
  283. catch (const std::exception& ex)
  284. {
  285. PLOGE.printf("StreamNode [%s] Error: Exception during decoder_->get_frame(): %s", name_.c_str(), ex.what());
  286. status_ = StreamStatus::ERROR;
  287. ndecoded_frame = 0; // Stop processing frames from this packet
  288. break; // Break inner frame loop
  289. }
  290. if (!frame_data)
  291. {
  292. PLOGE.printf("StreamNode [%s] Error: Decoder returned null frame data for frame %d.", name_.c_str(), i);
  293. status_ = StreamStatus::ERROR; // Treat null frame data as error
  294. ndecoded_frame = 0; // Stop processing frames from this packet
  295. break; // Break inner frame loop
  296. }
  297. // Update overall frame count and check skip logic
  298. frame_count_++;
  299. if (frame_count_ % skip_frame_ != 0)
  300. {
  301. continue; // Skip this decoded frame
  302. }
  303. cv::Mat frame_gpu(decoder_->get_height(), decoder_->get_width(), CV_8UC3, frame_data);
  304. // Create metadata and copy the frame data
  305. auto metaData = std::make_shared<meta::MetaData>();
  306. metaData->image = frame_gpu.clone(); // CLONE is crucial here!
  307. metaData->from = name_;
  308. send_output_data(metaData);
  309. }
  310. if (status_ == StreamStatus::ERROR)
  311. {
  312. break;
  313. }
  314. };
  315. PLOGI.printf("StreamNode [%s]: Exiting GPU processing loop (Running: %s, Status: %d, Total frames processed this session: %d).",
  316. name_.c_str(), running_ ? "true" : "false", static_cast<int>(status_), frame_count_ + 1);
  317. }
  318. } // namespace GNode