base.hpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #ifndef BASE_HPP__
  2. #define BASE_HPP__
  3. #include "common/meta.hpp"
  4. #include "common/queue.hpp"
  5. #include "common/data.hpp"
  6. #include <queue>
  7. #include <unordered_map>
  8. #include <string>
  9. #include <thread>
  10. #include <mutex>
  11. #include <condition_variable>
  12. #include <memory>
  13. #include <atomic>
  14. // 日志库
  15. #include "plog/Log.h"
  16. #include "plog/Initializers/RollingFileInitializer.h"
  17. namespace GNode
  18. {
  19. enum NODE_TYPE {
  20. SRC_NODE, // 输入节点
  21. MID_NODE, // 中间节点
  22. DES_NODE, // 输出节点
  23. };
  24. class BaseNode : public std::enable_shared_from_this<BaseNode>
  25. {
  26. public:
  27. BaseNode() = delete;
  28. BaseNode(const std::string& name, NODE_TYPE type);
  29. virtual ~BaseNode();
  30. virtual void work() = 0;
  31. void start();
  32. void stop();
  33. inline void add_input_buffer(const std::string& name, std::shared_ptr<SharedQueue<std::shared_ptr<meta::MetaData>>> buffer)
  34. {
  35. std::unique_lock<std::mutex> lock(mutex_);
  36. std::weak_ptr<BaseNode> weak_self = shared_from_this();
  37. // 设置回调,当数据push时通知当前节点
  38. // 有数据到当前节点的时候才触发条件变量
  39. buffer->set_push_callback([weak_self]() {
  40. if (auto self = weak_self.lock()) {
  41. self->cond_var_->notify_one();
  42. }
  43. });
  44. input_buffers_[name] = buffer;
  45. }
  46. inline void add_output_buffer(const std::string& name, std::shared_ptr<SharedQueue<std::shared_ptr<meta::MetaData>>> buffer)
  47. {
  48. std::unique_lock<std::mutex> lock(mutex_);
  49. output_buffers_[name] = buffer;
  50. }
  51. inline void del_input_buffer(const std::string& name)
  52. {
  53. std::unique_lock<std::mutex> lock(mutex_);
  54. if (input_buffers_.find(name) == input_buffers_.end())
  55. {
  56. return;
  57. }
  58. input_buffers_.erase(name);
  59. }
  60. inline void del_output_buffer(const std::string& name)
  61. {
  62. std::unique_lock<std::mutex> lock(mutex_);
  63. if (output_buffers_.find(name) == output_buffers_.end())
  64. {
  65. return;
  66. }
  67. output_buffers_.erase(name);
  68. }
  69. inline bool is_running()
  70. {
  71. return running_;
  72. }
  73. inline std::string get_name()
  74. {
  75. return name_;
  76. }
  77. protected:
  78. std::string name_;
  79. NODE_TYPE type_;
  80. std::thread worker_thread_;
  81. std::mutex mutex_;
  82. std::shared_ptr<std::condition_variable> cond_var_ =
  83. std::make_shared<std::condition_variable>();
  84. std::atomic<bool> running_{false};
  85. std::unordered_map<std::string, std::shared_ptr<SharedQueue<std::shared_ptr<meta::MetaData>>>> input_buffers_;
  86. std::unordered_map<std::string, std::shared_ptr<SharedQueue<std::shared_ptr<meta::MetaData>>>> output_buffers_;
  87. };
  88. static inline void LinkNode(const std::shared_ptr<BaseNode> &front,
  89. const std::shared_ptr<BaseNode> &back, int queue_size = 100, OverflowStrategy strategy = OverflowStrategy::Block)
  90. {
  91. PLOGI.printf("Link Node %s --> %s", front->get_name().c_str(), back->get_name().c_str());
  92. auto queue = std::make_shared<SharedQueue<std::shared_ptr<meta::MetaData>>>();
  93. back->add_input_buffer(front->get_name(), queue);
  94. front->add_output_buffer(back->get_name(), queue);
  95. }
  96. }
  97. #endif // BASE_HPP__