#include "graph/graph.hpp" #include namespace Graph { using json = nlohmann::json; static ModelType string_to_model_type(const std::string& type_str) { if (type_str == "YOLOV5") return ModelType::YOLOV5; if (type_str == "YOLOV5SEG") return ModelType::YOLOV5SEG; if (type_str == "YOLO11") return ModelType::YOLO11; if (type_str == "YOLO11POSE") return ModelType::YOLO11POSE; if (type_str == "YOLO11SEG") return ModelType::YOLO11SEG; if (type_str == "DEPTH_ANYTHING") return ModelType::DEPTH_ANYTHING; throw std::runtime_error("Unknown model type string: " + type_str); } // Helper to map string to DecodeType static GNode::DecodeType string_to_decode_type(const std::string& type_str) { if (type_str == "GPU") return GNode::DecodeType::GPU; if (type_str == "CPU") return GNode::DecodeType::CPU; if (type_str == "FOLDER") return GNode::DecodeType::FOLDER; throw std::runtime_error("Unknown decode type string: " + type_str); } void Graph::create_from_json(const std::string& json_path) { std::ifstream json_file(json_path); if (!json_file.is_open()) { throw std::runtime_error("Failed to open JSON file: " + json_path); } nlohmann::json config; try { json_file >> config; } catch (const nlohmann::json::parse_error& e) { throw std::runtime_error("Failed to parse JSON: " + std::string(e.what())); } // shared_models_.clear(); // configured_pipelines_.clear(); // 2. Load Models if (config.contains("models")) { for (auto& [model_id, model_config] : config["models"].items()) { try { std::string path = model_config.at("model_path").get(); std::string type_str = model_config.at("model_type").get(); std::vector names = model_config.at("names").get>(); int gpu_id = model_config.at("gpu_id").get(); float conf_thresh = model_config.value("confidence_threshold", 0.25f); // Use .value for optional with default float nms_thresh = model_config.value("nms_threshold", 0.45f); ModelType model_type_enum = string_to_model_type(type_str); // Load the model using your load function std::shared_ptr model_instance = load( path, model_type_enum, names, gpu_id, conf_thresh, nms_thresh); if (!model_instance) { throw std::runtime_error("Failed to load model: " + model_id); } shared_models_[model_id] = model_instance; std::cout << "Loaded model: " << model_id << std::endl; } catch (const std::exception& e) { throw std::runtime_error("Error processing model '" + model_id + "': " + e.what()); } } } // 3. Create Pipelines if (config.contains("pipelines")) { for (const auto& pipeline_config : config["pipelines"]) { try { PipelineInstance current_pipeline; current_pipeline.pipeline_id = pipeline_config.at("pipeline_id").get(); current_pipeline.description = pipeline_config.value("description", ""); // Optional description std::cout << "Creating pipeline: " << current_pipeline.pipeline_id << std::endl; // Temporary map to hold nodes of the current pipeline for linking std::unordered_map> current_pipeline_nodes_map; if (pipeline_config.contains("nodes")) { for (const auto& node_config : pipeline_config["nodes"]) { std::string node_id = node_config.at("node_id").get(); std::string node_type = node_config.at("node_type").get(); const auto& params = node_config.at("params"); std::shared_ptr new_node = nullptr; // --- Instantiate Node based on type --- if (node_type == "Source") { std::string url = params.at("stream_url").get(); int gpu_id = params.at("gpu_id").get(); std::string decode_str = params.at("decode_type").get(); int skip = params.value("skip_frame", 0); // Optional skip_frame GNode::DecodeType decode_type = string_to_decode_type(decode_str); auto stream_node = std::make_shared(node_id, url, gpu_id, decode_type); stream_node->set_skip_frame(skip); new_node = stream_node; } else if (node_type == "Inference") { std::string model_id_ref = params.at("model_id").get(); if (shared_models_.find(model_id_ref) == shared_models_.end()) { throw std::runtime_error("Model ID '" + model_id_ref + "' not found for node '" + node_id + "'"); } std::shared_ptr model_ptr = shared_models_.at(model_id_ref); auto infer_node = std::make_shared(node_id); infer_node->set_model_instance(model_ptr, model_ptr->get_gpu_id()); new_node = infer_node; } else if (node_type == "Tracker") { std::string track_name = params.at("track_name").get(); int track_frame = params.value("track_frame", 30); int track_dist = params.value("track_distance", 30); new_node = std::make_shared(node_id, track_name, track_frame, track_dist); } else if (node_type == "Analyzer") { new_node = std::make_shared(node_id); } else if (node_type == "Drawer") { new_node = std::make_shared(node_id); } else if (node_type == "Push") { new_node = std::make_shared(node_id); } else if (node_type == "Recorder") { std::string gst_pipeline = params.at("gst_pipeline").get(); int fps = params.value("fps", 25); new_node = std::make_shared(node_id, gst_pipeline, fps); } else { throw std::runtime_error("Unknown node type '" + node_type + "' for node ID '" + node_id + "'"); } if (new_node) { current_pipeline.nodes.push_back(new_node); current_pipeline_nodes_map[node_id] = new_node; std::cout << " Created node: " << node_id << " (" << node_type << ")" << std::endl; } } // --- Link nodes within the current pipeline --- if (current_pipeline.nodes.size() > 1) { int max_queue_size = 100; OverflowStrategy strategy = OverflowStrategy::BlockTimeout; for (size_t i = 0; i < current_pipeline.nodes.size() - 1; ++i) { GNode::LinkNode(current_pipeline.nodes[i], current_pipeline.nodes[i + 1], max_queue_size, strategy); } } } configured_pipelines_.push_back(std::move(current_pipeline)); } catch (const std::exception& e) { std::string pipeline_id = pipeline_config.value("pipeline_id", "UNKNOWN"); throw std::runtime_error("Error processing pipeline '" + pipeline_id + "': " + e.what()); } } } } }