#ifndef BASE_HPP__ #define BASE_HPP__ #include "common/meta.hpp" #include "common/queue.hpp" #include "common/data.hpp" #include #include #include #include #include #include #include #include // 日志库 #include "plog/Log.h" #include "plog/Initializers/RollingFileInitializer.h" namespace GNode { enum NODE_TYPE { SRC_NODE, // 输入节点 MID_NODE, // 中间节点 DES_NODE, // 输出节点 }; class BaseNode : public std::enable_shared_from_this { public: BaseNode() = delete; BaseNode(const std::string& name, NODE_TYPE type); virtual ~BaseNode(); virtual void work() = 0; void start(); void stop(); inline void add_input_buffer(const std::string& name, std::shared_ptr>> buffer) { std::unique_lock lock(mutex_); std::weak_ptr weak_self = shared_from_this(); // 设置回调,当数据push时通知当前节点 // 有数据到当前节点的时候才触发条件变量 buffer->set_push_callback([weak_self]() { if (auto self = weak_self.lock()) { self->cond_var_->notify_one(); } }); input_buffers_[name] = buffer; } inline void add_output_buffer(const std::string& name, std::shared_ptr>> buffer) { std::unique_lock lock(mutex_); output_buffers_[name] = buffer; } inline void del_input_buffer(const std::string& name) { std::unique_lock lock(mutex_); if (input_buffers_.find(name) == input_buffers_.end()) { return; } input_buffers_.erase(name); } inline void del_output_buffer(const std::string& name) { std::unique_lock lock(mutex_); if (output_buffers_.find(name) == output_buffers_.end()) { return; } output_buffers_.erase(name); } inline bool is_running() { return running_; } inline std::string get_name() { return name_; } protected: std::string name_; NODE_TYPE type_; std::thread worker_thread_; std::mutex mutex_; std::shared_ptr cond_var_ = std::make_shared(); std::atomic running_{false}; std::unordered_map>>> input_buffers_; std::unordered_map>>> output_buffers_; }; static inline void LinkNode(const std::shared_ptr &front, const std::shared_ptr &back, int queue_size = 100, OverflowStrategy strategy = OverflowStrategy::Block) { PLOGI.printf("Link Node %s --> %s", front->get_name().c_str(), back->get_name().c_str()); auto queue = std::make_shared>>(); back->add_input_buffer(front->get_name(), queue); front->add_output_buffer(back->get_name(), queue); } } #endif // BASE_HPP__