leon 2 달 전
부모
커밋
43bdb84ac8
7개의 변경된 파일235개의 추가작업 그리고 5개의 파일을 삭제
  1. 110 0
      src/common/threadPool.hpp
  2. 9 5
      src/main.cpp
  3. 0 0
      src/nodes/analyze/analyzeNode.cpp
  4. 22 0
      src/nodes/analyze/analyzeNode.hpp
  5. 28 0
      src/nodes/httpPush/httpPush.cpp
  6. 40 0
      src/nodes/httpPush/httpPush.hpp
  7. 26 0
      src/pipeline/pipiline.hpp

+ 110 - 0
src/common/threadPool.hpp

@@ -0,0 +1,110 @@
+#include <iostream>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <queue>
+#include <vector>
+#include <future>
+
+class ThreadPool
+{
+public:
+    ThreadPool(int num) : running_(true)
+    {
+        for (int i = 0; i < num; i++)
+        {
+            threads_.emplace_back([this](){
+                while (true)
+                {
+                    std::function<void()> task;
+                    {
+                        std::unique_lock<std::mutex> lock(mtx_);
+                        // 线程阻塞直到有任务(需要执行的函数)到任务队列中
+                        cv_.wait(lock, [this]{ return !tasks_.empty() || !running_; });
+                        // 如果队列空了并且线程池需要被停止了,直接返回,结束循环
+                        if (tasks_.empty() && !running_)
+                        {
+                            return;
+                        }
+                         // 从任务队列中取出任务
+                        task = std::move(tasks_.front());
+                        tasks_.pop();
+                    }
+                    // 这样通过 std::bind 创建的 task 对象实际上是一个可调用对象。
+                    // 当你调用 task() 时,它会使用绑定的 f 和 args... 来执行原始的函数或 Lambda 表达式。
+                    task();
+                }
+            });
+        }
+    }
+
+    // 析构函数
+    // 1. 设置 running_ 为false
+    // 2. 通知所有线程
+    // 3. 等待线程执行完成
+    ~ThreadPool()
+    {
+        // 将running_ 设置为std::atomic<bool>,可以不用手动加锁了
+        running_ = false;
+        cv_.notify_all();
+        for(auto& thread : threads_)
+        {
+            if (thread.joinable()) thread.join();
+        }
+    }
+
+    // 提交任务到任务队列中
+    // 1. 使用变长参数模板接收函数的所有参数
+    // 2. 使用std::forward 完美转发保证参数的左值右值状态, 避免无意间的拷贝或移动,从而提高了效率。
+    // 3. 通知线程有数据到队列中了
+    // F&& f, Args&&... args 这个是万能引用
+    template<typename Func, typename ...Args>
+    void submitTask(Func&& func, Args&& ... args)
+    {
+        // std::function<void()> 是一个通用的可调用对象包装器
+        // 它可以存储任何符合 void() 签名的可调用对象
+        // 这里是将 task 定义为一个没有返回值(void)且无参数的函数。
+        // std::bind 是一个用于创建“绑定”函数对象的标准库功能
+        // 它能够将一个函数(或者可调用对象)与固定的参数绑定,返回一个新的可调用对象。
+        std::function<void()> task = 
+            std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
+        {
+            std::unique_lock<std::mutex> lock(mtx_);
+            tasks_.emplace(task);
+        }
+        cv_.notify_one();
+    }
+
+    // 带有返回结果的submit函数
+    template<typename Func, typename ...Args>
+    auto submitTaskWithResult(Func&& func, Args&& ... args) -> std::future<decltype(func(args...))>
+    {
+        // 推导函数返回的类型,rtype代替函数返回类型
+        using rtype = decltype(func(args...));
+        // 使用std::packaged_task 接收 std::bind绑定的可调用对象
+        // 这里使用shard_ptr 是为了将task放入到队列中时, 不会被销毁
+        // 有可能放入到队列后还没有执行, 这里已经执行完了,导致task被销毁
+        std::shared_ptr<std::packaged_task<rtype()>> task_ptr = std::make_shared<std::packaged_task<rtype()>>(
+            std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
+        auto fut = task_ptr->get_future();
+        {
+            std::unique_lock<std::mutex> lock(mtx_);
+            // 这个任务队列里面接收到一个lambda函数,这个函数做了执行task函数
+            // 保持和没有返回值的线程池保持一致, 使用lambda函数在做一层封装
+            // 不用在线程中解引用再执行
+            tasks_.emplace([task_ptr]{ (*task_ptr)(); });
+        }
+        cv_.notify_one();
+        return fut;
+    }
+
+private:
+    // 线程列表
+    std::vector<std::thread> threads_;
+    // 任务队列
+    std::queue<std::function<void()>> tasks_;
+    std::condition_variable cv_;
+    std::mutex mtx_;
+
+    std::atomic<bool> running_;
+};

+ 9 - 5
src/main.cpp

@@ -18,11 +18,15 @@ int main()
     infer_node->start();
     src_node->start();
     
-    while(true)
-    {
-        std::this_thread::sleep_for(std::chrono::seconds(1));
-    }
+    getchar();
     return 0;
 }
 
-// TODO: 单模型多路复用
+
+// TODO:创建pipeline类,能够打印pipeline的结构、状态、启动、停止、重启、添加节点、删除节点、连接节点、断开节点
+// TODO: 硬解码
+// TODO: 模型多路复用
+// TODO: 通过配置文件创建 pipeline
+// TODO: 多种推理框架支持 opencv tensorrt onnxruntime ...
+// TODO:日志系统
+// TODO: 结果推送接口 http mqtt ...

+ 0 - 0
src/nodes/analyze/analyzeNode.cpp


+ 22 - 0
src/nodes/analyze/analyzeNode.hpp

@@ -0,0 +1,22 @@
+#ifndef ANALYZENODE_HPP__
+#define ANALYZENODE_HPP__
+
+#include "nodes/base/base.hpp"
+#include <opencv2/opencv.hpp>
+
+namespace Node
+{
+
+class AnalyzeNode : public BaseNode
+{
+public:
+    AnalyzeNode() = delete;
+    AnalyzeNode(const std::string& name) : BaseNode(name, NODE_TYPE::MID_NODE) {}
+    virtual ~AnalyzeNode() { };
+
+    void work() override;
+};
+
+
+}   // namespace Node
+#endif // ANALYZENODE_HPP__

+ 28 - 0
src/nodes/httpPush/httpPush.cpp

@@ -0,0 +1,28 @@
+#include "nodes/base/base.hpp"
+#include "nodes/httpPush/httpPush.hpp"
+
+namespace Node
+{
+
+void HttpPush::work()
+{
+    printf("HttpPush %s\n", name_.c_str());
+    while (running_)
+    {
+        for (auto& input_buffer : input_buffers_)
+        {
+            std::shared_ptr<meta::MetaData> metaData;
+            if (!input_buffer.second->try_pop(metaData))
+            {
+                continue;
+            }
+            printf("Node %s get data from %s\n", name_.c_str(), input_buffer.first.c_str());
+            // do something
+            cv::Mat image = metaData->image;
+            std::string image_name = getTimeString() + ".jpg";
+            cv::imwrite(image_name, image);
+        }
+    }
+};
+
+}   // namespace Node

+ 40 - 0
src/nodes/httpPush/httpPush.hpp

@@ -0,0 +1,40 @@
+#ifndef HTTP_PUSH_HPP__
+#define HTTP_PUSH_HPP__ 
+#include "nodes/base/base.hpp"
+#include <opencv2/opencv.hpp>
+#include "common/threadPool.hpp"
+namespace Node
+{
+
+class HttpPush : public BaseNode
+{
+public:
+    HttpPush() = delete;
+    HttpPush(const std::string& name) : BaseNode(name, NODE_TYPE::DES_NODE) {}
+    HttpPush(const std::string& name, const std::string& ip, int port, const std::string& address)
+        : BaseNode(name, NODE_TYPE::DES_NODE), ip_(ip), port_(port), address_(address) {}
+
+    virtual ~HttpPush() { };
+
+    inline void set_ip(const std::string& ip) { ip_ = ip; }
+    inline void set_port(int port) { port_ = port; }
+    inline void set_address(const std::string& address) { address_ = address; }
+    // 初始化线程池
+    inline void init_thread_pool(int num) 
+    {
+        thread_pool_ = std::make_shared<ThreadPool>(num);
+    }
+
+    void work() override;
+
+private:
+    std::string ip_;
+    int port_;
+    std::string address_;
+    std::shared_ptr<ThreadPool> thread_pool_;
+    
+};
+
+}
+
+#endif // HTTP_PUSH_HPP__

+ 26 - 0
src/pipeline/pipiline.hpp

@@ -0,0 +1,26 @@
+#ifndef PIPELINE_HPP__
+#define PIPELINE_HPP__
+
+#include <string>
+
+
+namespace pip
+{
+
+class Pipeline
+{
+public:
+    Pipeline() = delete;
+    Pipeline(const std::string& name) : name_(name) {}
+
+    void start();
+    void stop();
+
+private:
+    std::string name_;
+    bool running_ = false;
+};
+
+}
+
+#endif // PIPELINE_HPP__