123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- #ifndef BASE_HPP__
- #define BASE_HPP__
- #include "common/meta.hpp"
- #include "common/queue.hpp"
- #include "common/data.hpp"
- #include <queue>
- #include <unordered_map>
- #include <string>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <memory>
- #include <atomic>
- // 日志库
- #include "plog/Log.h"
- #include "plog/Initializers/RollingFileInitializer.h"
- namespace GNode
- {
- enum NODE_TYPE {
- SRC_NODE, // 输入节点
- MID_NODE, // 中间节点
- DES_NODE, // 输出节点
- };
- class BaseNode : public std::enable_shared_from_this<BaseNode>
- {
- public:
- BaseNode() = delete;
- BaseNode(const std::string& name, NODE_TYPE type);
- virtual ~BaseNode();
- virtual void work() = 0;
- void start();
- void stop();
- inline void add_input_buffer(const std::string& name, std::shared_ptr<SharedQueue<std::shared_ptr<meta::MetaData>>> buffer)
- {
- std::unique_lock<std::mutex> lock(mutex_);
- std::weak_ptr<BaseNode> weak_self = shared_from_this();
- // 设置回调,当数据push时通知当前节点
- // 有数据到当前节点的时候才触发条件变量
- buffer->set_push_callback([weak_self]() {
- if (auto self = weak_self.lock()) {
- self->cond_var_->notify_one();
- }
- });
- input_buffers_[name] = buffer;
- }
- inline void add_output_buffer(const std::string& name, std::shared_ptr<SharedQueue<std::shared_ptr<meta::MetaData>>> buffer)
- {
- std::unique_lock<std::mutex> lock(mutex_);
- output_buffers_[name] = buffer;
- }
- inline void del_input_buffer(const std::string& name)
- {
- std::unique_lock<std::mutex> lock(mutex_);
- if (input_buffers_.find(name) == input_buffers_.end())
- {
- return;
- }
- input_buffers_.erase(name);
- }
- inline void del_output_buffer(const std::string& name)
- {
- std::unique_lock<std::mutex> lock(mutex_);
- if (output_buffers_.find(name) == output_buffers_.end())
- {
- return;
- }
- output_buffers_.erase(name);
- }
- inline bool is_running()
- {
- return running_;
- }
- inline std::string get_name()
- {
- return name_;
- }
- protected:
- std::string name_;
- NODE_TYPE type_;
- std::thread worker_thread_;
- std::mutex mutex_;
- std::shared_ptr<std::condition_variable> cond_var_ =
- std::make_shared<std::condition_variable>();
- std::atomic<bool> running_{false};
- std::unordered_map<std::string, std::shared_ptr<SharedQueue<std::shared_ptr<meta::MetaData>>>> input_buffers_;
- std::unordered_map<std::string, std::shared_ptr<SharedQueue<std::shared_ptr<meta::MetaData>>>> output_buffers_;
- };
- static inline void LinkNode(const std::shared_ptr<BaseNode> &front,
- const std::shared_ptr<BaseNode> &back, int queue_size = 100, OverflowStrategy strategy = OverflowStrategy::Block)
- {
- PLOGI.printf("Link Node %s --> %s", front->get_name().c_str(), back->get_name().c_str());
- auto queue = std::make_shared<SharedQueue<std::shared_ptr<meta::MetaData>>>();
- back->add_input_buffer(front->get_name(), queue);
- front->add_output_buffer(back->get_name(), queue);
- }
- }
- #endif // BASE_HPP__
|