|
@@ -1,4 +1,5 @@
|
|
|
#include "pipeline/pipeline.hpp"
|
|
|
+#include <yaml-cpp/yaml.h>
|
|
|
#include <fstream>
|
|
|
|
|
|
|
|
@@ -159,6 +160,10 @@ void PipelineManager::create_from_json(const std::string& json_path)
|
|
|
else if (node_type == "Drawer")
|
|
|
{
|
|
|
new_node = std::make_shared<GNode::DrawNode>(node_id);
|
|
|
+ std::shared_ptr<meta::DrawConfigData> config_data = std::make_shared<meta::DrawConfigData>();
|
|
|
+ config_data->show_final_result = true;
|
|
|
+ config_data->show_final_result = false;
|
|
|
+ new_node->set_config_data(config_data);
|
|
|
}
|
|
|
else if (node_type == "Push")
|
|
|
{
|
|
@@ -210,4 +215,238 @@ void PipelineManager::create_from_json(const std::string& json_path)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void PipelineManager::create_from_yaml(const std::string& yaml_path) {
|
|
|
+ YAML::Node config;
|
|
|
+ try {
|
|
|
+ config = YAML::LoadFile(yaml_path);
|
|
|
+ } catch (const YAML::Exception& e) {
|
|
|
+ throw std::runtime_error("Failed to load or parse YAML file '" + yaml_path + "': " + e.what());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 1. Load Shared Models
|
|
|
+ if (config["models"] && config["models"].IsMap()) {
|
|
|
+ for (YAML::const_iterator it = config["models"].begin(); it != config["models"].end(); ++it) {
|
|
|
+ std::string model_id = it->first.as<std::string>();
|
|
|
+ const YAML::Node& model_config = it->second;
|
|
|
+
|
|
|
+ try {
|
|
|
+ // Check required fields exist before accessing
|
|
|
+ if (!model_config["model_path"] || !model_config["model_type"] || !model_config["names"] || !model_config["gpu_id"]) {
|
|
|
+ throw std::runtime_error("Model '" + model_id + "' is missing required fields (model_path, model_type, names, gpu_id).");
|
|
|
+ }
|
|
|
+
|
|
|
+ std::string path = model_config["model_path"].as<std::string>();
|
|
|
+ std::string type_str = model_config["model_type"].as<std::string>();
|
|
|
+ std::vector<std::string> names = model_config["names"].as<std::vector<std::string>>();
|
|
|
+ int gpu_id = model_config["gpu_id"].as<int>();
|
|
|
+ // Use .as<T>(defaultValue) for optional fields
|
|
|
+ float conf_thresh = model_config["confidence_threshold"].as<float>(0.25f);
|
|
|
+ float nms_thresh = model_config["nms_threshold"].as<float>(0.45f);
|
|
|
+
|
|
|
+ ModelType model_type_enum = string_to_model_type(type_str);
|
|
|
+
|
|
|
+ // Load the model using your load function
|
|
|
+ std::shared_ptr<Infer> model_instance = load(
|
|
|
+ path, model_type_enum, names, gpu_id, conf_thresh, nms_thresh);
|
|
|
+
|
|
|
+ if (!model_instance) {
|
|
|
+ throw std::runtime_error("Load function returned null for model: " + model_id);
|
|
|
+ }
|
|
|
+
|
|
|
+ shared_models_[model_id] = model_instance;
|
|
|
+ std::cout << "Loaded model: " << model_id << std::endl;
|
|
|
+
|
|
|
+ } catch (const YAML::Exception& e) {
|
|
|
+ throw std::runtime_error("YAML parsing error processing model '" + model_id + "': " + e.what());
|
|
|
+ } catch (const std::exception& e) {
|
|
|
+ // Catch other exceptions like runtime_error from string_to_model_type or load
|
|
|
+ throw std::runtime_error("Error processing model '" + model_id + "': " + e.what());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ std::cout << "No 'models' section found in YAML or it's not a map." << std::endl;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 2. Create Pipelines
|
|
|
+ if (config["pipelines"] && config["pipelines"].IsSequence()) {
|
|
|
+ for (const auto& pipeline_config : config["pipelines"]) { // Iterate through sequence items
|
|
|
+ std::string current_pipeline_id = "UNKNOWN"; // Default in case ID is missing
|
|
|
+ try {
|
|
|
+ if (!pipeline_config["pipeline_id"]) {
|
|
|
+ throw std::runtime_error("Pipeline entry missing 'pipeline_id'.");
|
|
|
+ }
|
|
|
+ PipelineInstance current_pipeline;
|
|
|
+ current_pipeline.pipeline_id = pipeline_config["pipeline_id"].as<std::string>();
|
|
|
+ current_pipeline_id = current_pipeline.pipeline_id; // Update for error reporting
|
|
|
+ current_pipeline.description = pipeline_config["description"].as<std::string>(""); // Optional description
|
|
|
+
|
|
|
+ std::cout << "Creating pipeline: " << current_pipeline.pipeline_id << std::endl;
|
|
|
+
|
|
|
+ // Temporary map to hold nodes of the current pipeline for linking (not strictly needed with sequential linking)
|
|
|
+ // std::unordered_map<std::string, std::shared_ptr<GNode::BaseNode>> current_pipeline_nodes_map;
|
|
|
+
|
|
|
+ if (pipeline_config["nodes"] && pipeline_config["nodes"].IsSequence()) {
|
|
|
+ for (const auto& node_config : pipeline_config["nodes"]) {
|
|
|
+ std::string node_id = "UNKNOWN";
|
|
|
+ std::string node_type = "UNKNOWN";
|
|
|
+ try {
|
|
|
+ if (!node_config["node_id"] || !node_config["node_type"] || !node_config["params"]) {
|
|
|
+ throw std::runtime_error("Node entry missing required fields (node_id, node_type, params).");
|
|
|
+ }
|
|
|
+ node_id = node_config["node_id"].as<std::string>();
|
|
|
+ node_type = node_config["node_type"].as<std::string>();
|
|
|
+ const YAML::Node& params = node_config["params"];
|
|
|
+
|
|
|
+ if (!params.IsMap()) {
|
|
|
+ throw std::runtime_error("Node '" + node_id + "' has invalid 'params' (must be a map).");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ std::shared_ptr<GNode::BaseNode> new_node = nullptr;
|
|
|
+
|
|
|
+ // --- Instantiate Node based on type ---
|
|
|
+ if (node_type == "Source") {
|
|
|
+ if (!params["stream_url"] || !params["gpu_id"] || !params["decode_type"] ) {
|
|
|
+ throw std::runtime_error("Source node '" + node_id + "' missing required params (stream_url, gpu_id, decode_type).");
|
|
|
+ }
|
|
|
+ std::string url = params["stream_url"].as<std::string>();
|
|
|
+ int gpu_id = params["gpu_id"].as<int>();
|
|
|
+ std::string decode_str = params["decode_type"].as<std::string>();
|
|
|
+ int skip = params["skip_frame"].as<int>(0); // Optional skip_frame
|
|
|
+
|
|
|
+ GNode::DecodeType decode_type = string_to_decode_type(decode_str);
|
|
|
+ auto stream_node = std::make_shared<GNode::StreamNode>(node_id, url, gpu_id, decode_type);
|
|
|
+ stream_node->set_skip_frame(skip);
|
|
|
+ new_node = stream_node;
|
|
|
+ } else if (node_type == "Inference") {
|
|
|
+ if (!params["model_id"]) {
|
|
|
+ throw std::runtime_error("Inference node '" + node_id + "' missing required param 'model_id'.");
|
|
|
+ }
|
|
|
+ std::string model_id_ref = params["model_id"].as<std::string>();
|
|
|
+ if (shared_models_.find(model_id_ref) == shared_models_.end()) {
|
|
|
+ throw std::runtime_error("Model ID '" + model_id_ref + "' not found for node '" + node_id + "'");
|
|
|
+ }
|
|
|
+ std::shared_ptr<Infer> model_ptr = shared_models_.at(model_id_ref);
|
|
|
+ auto infer_node = std::make_shared<GNode::InferNode>(node_id);
|
|
|
+ infer_node->set_model_instance(model_ptr, model_ptr->get_gpu_id());
|
|
|
+ new_node = infer_node;
|
|
|
+ } else if (node_type == "Tracker") {
|
|
|
+ if (!params["track_name"]) {
|
|
|
+ throw std::runtime_error("Tracker node '" + node_id + "' missing required param 'track_name'.");
|
|
|
+ }
|
|
|
+ std::string track_name = params["track_name"].as<std::string>();
|
|
|
+ int track_frame = params["track_frame"].as<int>(30);
|
|
|
+ int track_dist = params["track_distance"].as<int>(30);
|
|
|
+ new_node = std::make_shared<GNode::TrackNode>(node_id, track_name, track_frame, track_dist);
|
|
|
+ } else if (node_type == "Analyzer") {
|
|
|
+ if (!params["algorithm_names"] || !params["algorithm_params_fence"]) {
|
|
|
+ throw std::runtime_error("Analyzer node '" + node_id + "' missing required params (algorithm_names, algorithm_params_fence).");
|
|
|
+ }
|
|
|
+ new_node = std::make_shared<GNode::AnalyzeNode>(node_id);
|
|
|
+ std::shared_ptr<meta::AnalyzeConfigData> config_data = std::make_shared<meta::AnalyzeConfigData>();
|
|
|
+ config_data->algorithm_names = params["algorithm_names"].as<std::vector<std::string>>();
|
|
|
+
|
|
|
+ const YAML::Node& fence_params = params["algorithm_params_fence"];
|
|
|
+ if (fence_params && fence_params.IsMap()) {
|
|
|
+ for (YAML::const_iterator fence_it = fence_params.begin(); fence_it != fence_params.end(); ++fence_it) {
|
|
|
+ std::string name = fence_it->first.as<std::string>();
|
|
|
+ const YAML::Node& points_node = fence_it->second; // Sequence of points
|
|
|
+
|
|
|
+ if (!points_node || !points_node.IsSequence()) {
|
|
|
+ std::cerr << "Warning: Invalid fence points format for '" << name << "' in node '" << node_id << "'. Expected a sequence." << std::endl;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ std::vector<std::vector<cv::Point>> multi_fence_points;
|
|
|
+ std::vector<cv::Point> fence_points;
|
|
|
+ for (const auto& point_node : points_node) { // Iterate through points in the sequence
|
|
|
+ if (point_node && point_node.IsMap() && point_node["x"] && point_node["y"]) {
|
|
|
+ try {
|
|
|
+ int x = point_node["x"].as<int>();
|
|
|
+ int y = point_node["y"].as<int>();
|
|
|
+ fence_points.emplace_back(x, y);
|
|
|
+ } catch (const YAML::Exception& e) {
|
|
|
+ std::cerr << "Warning: Failed to parse point for '" << name << "' in node '" << node_id << "': " << e.what() << std::endl;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ std::cerr << "Warning: Invalid point format encountered for '" << name << "' in node '" << node_id << "'." << std::endl;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!fence_points.empty()) {
|
|
|
+ // Replicate original logic: wrap the single fence in an outer vector
|
|
|
+ multi_fence_points.push_back(fence_points);
|
|
|
+ }
|
|
|
+ config_data->algorithm_params_fence[name] = multi_fence_points;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ std::cerr << "Warning: 'algorithm_params_fence' is missing or not a map for node '" << node_id << "'." << std::endl;
|
|
|
+ }
|
|
|
+ new_node->set_config_data(config_data);
|
|
|
+ } else if (node_type == "Drawer") {
|
|
|
+ new_node = std::make_shared<GNode::DrawNode>(node_id);
|
|
|
+ std::shared_ptr<meta::DrawConfigData> config_data = std::make_shared<meta::DrawConfigData>();
|
|
|
+ // Read optional param from YAML, default to false if missing
|
|
|
+ config_data->show_original_result = params["show_original_result"].as<bool>(false);
|
|
|
+ // Set default for the other flag (assuming it's not typically in config)
|
|
|
+ config_data->show_final_result = true; // Or false, based on desired default
|
|
|
+ new_node->set_config_data(config_data);
|
|
|
+ } else if (node_type == "Push") {
|
|
|
+ // Assuming HttpPushNode has no specific YAML params in this example
|
|
|
+ new_node = std::make_shared<GNode::HttpPushNode>(node_id);
|
|
|
+ } else if (node_type == "Recorder") {
|
|
|
+ if (!params["gst_pipeline"]) {
|
|
|
+ throw std::runtime_error("Recorder node '" + node_id + "' missing required param 'gst_pipeline'.");
|
|
|
+ }
|
|
|
+ std::string gst_pipeline_str = params["gst_pipeline"].as<std::string>();
|
|
|
+ int fps = params["fps"].as<int>(25);
|
|
|
+ new_node = std::make_shared<GNode::RecordNode>(node_id, gst_pipeline_str, fps);
|
|
|
+ } else {
|
|
|
+ throw std::runtime_error("Unknown node type '" + node_type + "'");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (new_node) {
|
|
|
+ current_pipeline.nodes.push_back(new_node);
|
|
|
+ // current_pipeline_nodes_map[node_id] = new_node; // Only needed if linking out of order
|
|
|
+ std::cout << " Created node: " << node_id << " (" << node_type << ")" << std::endl;
|
|
|
+ }
|
|
|
+ } catch (const YAML::Exception& e) {
|
|
|
+ throw std::runtime_error("YAML parsing error for node '" + node_id + "' in pipeline '" + current_pipeline_id + "': " + e.what());
|
|
|
+ } catch (const std::exception& e) {
|
|
|
+ throw std::runtime_error("Error creating node '" + node_id + "' (" + node_type + ") in pipeline '" + current_pipeline_id + "': " + e.what());
|
|
|
+ }
|
|
|
+ } // End loop nodes
|
|
|
+
|
|
|
+ // --- Link nodes within the current pipeline ---
|
|
|
+ if (current_pipeline.nodes.size() > 1) {
|
|
|
+ int max_queue_size = 20; // Or read from YAML if needed
|
|
|
+ OverflowStrategy strategy = OverflowStrategy::DiscardOldest; // Or read from YAML
|
|
|
+
|
|
|
+ for (size_t i = 0; i < current_pipeline.nodes.size() - 1; ++i) {
|
|
|
+ GNode::LinkNode(current_pipeline.nodes[i],
|
|
|
+ current_pipeline.nodes[i + 1],
|
|
|
+ max_queue_size,
|
|
|
+ strategy);
|
|
|
+ std::cout << " Linked node " << current_pipeline.nodes[i]->get_id()
|
|
|
+ << " -> " << current_pipeline.nodes[i+1]->get_id() << std::endl;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ std::cout << " Pipeline '" << current_pipeline.pipeline_id << "' has no 'nodes' sequence." << std::endl;
|
|
|
+ }
|
|
|
+ configured_pipelines_.push_back(std::move(current_pipeline));
|
|
|
+
|
|
|
+ } catch (const YAML::Exception& e) {
|
|
|
+ // Catch YAML errors during pipeline-level parsing
|
|
|
+ throw std::runtime_error("YAML parsing error processing pipeline '" + current_pipeline_id + "': " + e.what());
|
|
|
+ } catch (const std::exception& e) {
|
|
|
+ // Catch other errors during pipeline processing
|
|
|
+ throw std::runtime_error("Error processing pipeline '" + current_pipeline_id + "': " + e.what());
|
|
|
+ }
|
|
|
+ } // End loop pipelines
|
|
|
+ } else {
|
|
|
+ std::cout << "No 'pipelines' section found in YAML or it's not a sequence." << std::endl;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
}
|