leon 3 minggu lalu
induk
melakukan
22f39c838f
5 mengubah file dengan 441 tambahan dan 0 penghapusan
  1. 1 0
      .gitignore
  2. 22 0
      Makefile
  3. 282 0
      infer.py
  4. 68 0
      logger.py
  5. 68 0
      stream.py

+ 1 - 0
.gitignore

@@ -2,3 +2,4 @@ images/
 result/
 __pycache__
 log
+models/

+ 22 - 0
Makefile

@@ -0,0 +1,22 @@
+start:
+	nohup python3 infer.py > /dev/null 2>&1 &
+	@sleep 1s
+	@ps -ef | grep infer.py
+
+debug:
+	python3 infer.py
+
+status:
+	@ps -ef | grep infer.py 
+
+
+log:
+	@tail -f log/runtime.log
+
+help:
+	@echo "make start   => start server"
+	@echo "make status  => check server status"
+	@echo "make log     => print log"
+	@echo "make help    => help"
+
+.PHONY: start status help log

+ 282 - 0
infer.py

@@ -0,0 +1,282 @@
+'''
+@File    :   infer.py
+@Time    :   2024/10/17 11:18:00
+@Author  :   leon 
+@Version :   1.0
+@Desc    :   None
+'''
+import cv2
+import torch
+import numpy as np
+from ultralytics import YOLO
+from shapely.geometry.polygon import Polygon
+
+from collections import namedtuple
+from typing import List, Tuple
+import requests
+
+from logger import logger
+from stream import StreamCapture 
+
+Box = namedtuple("Box", ["left", "top", "right", "bottom", "confidence", "label"])
+
+class PerspectiveMatrix:
+    def __init__(self, matrix: np.ndarray, target: Tuple[int, int]) -> None:
+        self.matrix = matrix
+        self.target = target
+    
+    def __repr__(self):
+        matrix_str = np.array2string(self.matrix, formatter={'float_kind': lambda x: f"{x:.2f}"})
+        return f"PerspectiveMatrix(matrix={matrix_str}, target={self.target})"
+    
+    @staticmethod
+    def perspective_matrix(src: List[Tuple[int, int]], dst: List[Tuple[int, int]], target : Tuple[int, int]) -> "PerspectiveMatrix":
+        """
+        计算透视变换矩阵。
+        
+        参数:
+        - src: 源图像上的 4 个点 [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]
+        - dst: 目标图像上的 4 个点 [(u1, v1), (u2, v2), (u3, v3), (u4, v4)]
+        
+        返回:
+        - PerspectiveMatrix: 透视变换矩阵
+        """
+        src_pts = np.array(src, dtype=np.float32)
+        dst_pts = np.array(dst, dtype=np.float32)
+
+        # 计算透视变换矩阵
+        # matrix = cv2.getPerspectiveTransform(src_pts, dst_pts)
+        # 单应性矩阵
+        matrix, _ = cv2.findHomography(src_pts, dst_pts)
+
+        return PerspectiveMatrix(matrix=matrix, target=target)
+
+class ImageTransformer:
+    @staticmethod
+    def PerspectiveTransform(
+        image: np.ndarray, 
+        perspective_matrix: PerspectiveMatrix,
+        flags=cv2.INTER_LINEAR,
+        borderMode=cv2.BORDER_CONSTANT,
+        borderValue=(114, 114, 114)
+    ) -> np.ndarray:
+        if image is None or perspective_matrix is None:
+            raise ValueError("Input image and warpaffine_matrix cannot be None.")
+
+        transformed_image = cv2.warpPerspective(
+            image, 
+            perspective_matrix.matrix, 
+            perspective_matrix.target, 
+            flags=flags,
+            borderMode=borderMode,
+            borderValue=borderValue)
+        return transformed_image
+
+class AreaManage(object):
+    def __init__(self):
+        self.door_areas = [
+            [(934, 143), (958, 130), (961, 165), (936, 181)],
+            [(564, 399), (682, 432), (684, 528), (574, 493)]]
+        
+        self.door_perspective_matrix = []
+
+        self.dst_door_points = [(0,0), (224, 0), (224, 224), (0, 224)]
+
+        for p in self.door_areas:
+            perspective_matrix = PerspectiveMatrix.perspective_matrix(p, self.dst_door_points, (224, 224))
+            self.door_perspective_matrix.append(perspective_matrix)
+
+        # 预设值的三个门对应的人的活动区域
+        self.person_areas = [
+            [(900, 152),  (914, 84), (637, 20), (604, 73)],
+            [(860, 200), (958, 226), (687, 432), (586, 395)]]
+        
+        self.person_areas_polygon = [Polygon(area) for area in self.person_areas]
+        
+
+    def update(self, person_area):
+        areas = []
+        person_area_polygon = Polygon(person_area)
+        for pap in self.person_areas_polygon:
+            areas.append(person_area_polygon.intersection(pap).area)
+        max_area = max(areas)
+        if max_area > 0:
+            max_idx = areas.index(max_area)
+            self.person_areas_polygon[max_idx] = person_area_polygon
+        
+    def inner_area(self, person_polygon):
+        for i in range(len(self.person_areas_polygon)):
+            if person_polygon.intersection(self.person_areas_polygon[i]).area / person_polygon.area > 0.5:
+                return i
+        return -1
+
+
+        
+
+"""
+1. 人员检测
+2. 检查区域内是否有人
+3. 如果区域内有人,分类该区域的门是否关闭
+4. 如果关闭,上报违章
+"""
+class DoorInference(object):
+    """
+    human_model_path : 检测人的模型地址
+    door_model_path  : 门分类的模型地址
+    person_areas     : 电子围栏区域 [[(x,y),(x,y),(x,y),(x,y),...],[(x,y),(x,y),(x,y),(x,y),...],...]
+    device_id        : 显卡id, -1表示使用cpu
+    confidence_threshold : 检测人的模型的阈值
+    """
+    def __init__(self, human_model_path, door_model_path, person_areas, device_id=0, confidence_threshold= 0.5) -> "DoorInference":
+        self.device = torch.device(f"cuda:{device_id}" if torch.cuda.is_available() and device_id !=-1 else "cpu")
+        self.confidence_threshold = confidence_threshold
+        self.human_model = YOLO(human_model_path).to(self.device)
+        self.door_model  = YOLO(door_model_path).to(self.device)
+
+        self.door_names  = {0: 'block', 1: 'close', 2: 'open'}
+
+        self.am = AreaManage()
+
+        if person_areas:
+            for person_area in person_areas:
+                self.am.update(person_area)
+
+    def __repr__(self):
+        infer_str = f"DoorInference(device = {self.device})"
+        return infer_str
+    
+    def rect(self, point_list):
+        """
+        根据给定的点计算矩形框
+        :param point_list: [(x1, y1), (x2, y2), ..., (xn, yn)] 多个点
+        :return: 左上角和右下角的坐标,表示矩形的框
+        """
+        # 获取所有点的 x 和 y 坐标的最小和最大值
+        min_x = min(point[0] for point in point_list)
+        max_x = max(point[0] for point in point_list)
+        min_y = min(point[1] for point in point_list)
+        max_y = max(point[1] for point in point_list)
+
+        # 计算矩形框的左上角和右下角
+        left, top = min_x, min_y
+        right, bottom = max_x, max_y
+
+        return left, top ,right, bottom
+
+    """
+    返回所有检测到的人
+    """
+    def person_detect(self, image):
+        objs  = []
+        confs = []
+        results = self.human_model(image, stream=False, classes=[4], conf=self.confidence_threshold, iou=0.3, imgsz=640)
+        for result in results:
+            boxes = result.boxes.cpu()
+            for box in boxes:
+                conf = box.conf.cpu().numpy().tolist()[0]
+                left, top, right, bottom = box.xyxy.tolist()[0]
+                objs.append([(left, top), (right, top), (right, bottom), (left, bottom)])
+                confs.append(conf)
+        return objs, confs
+
+    """
+    判断门是否关闭
+    """
+    def is_door_close(self, image, index) -> bool:
+        # 图片透视变换, 只需要门的区域
+        # 比只使用矩形框更大程度的只保留门的区域
+        door_image  = ImageTransformer.PerspectiveTransform(image, self.am.door_perspective_matrix[index])
+        # cv2.imshow('0',door_image)
+        # cv2.waitKey(30000)
+        res = self.door_model(door_image)
+        class_idx = res[0].probs.top1
+        class_conf = res[0].probs.top1conf.cpu().numpy()
+        return class_idx == 1, class_conf
+
+    """ 
+    image 输入待识别图片
+    返回需要画框的门和人的坐标
+    """
+    def __call__(self, image):
+        inner_person = {0:[], 1:[], 2:[]}
+        person_boxes, person_confs = self.person_detect(image)
+        
+        for person_box, person_conf in zip(person_boxes, person_confs):
+            person_polygon = Polygon(person_box)
+            idx = self.am.inner_area(person_polygon)
+            if idx == -1: continue
+            inner_person[idx].append({"box" : person_box, "conf" : person_conf})
+
+        result = []
+        for i, persons in inner_person.items():
+            if len(persons) == 0:
+                continue
+            close, class_conf = self.is_door_close(image, i)
+            if close:
+                left, top ,right, bottom = self.rect(self.am.door_areas[i])
+                result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=class_conf, label="door_close"))
+                for person in persons:
+                    left, top ,right, bottom = self.rect(person["box"])
+                    conf = person["conf"]
+                    result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=conf, label="person"))
+        return result
+    
+    
+if __name__ == "__main__":
+    logger.info("Start Server")
+    human_model_path = "models/work_clo_person_head_hat.pt"
+    door_model_path  = "models/door_classify.pt"
+    #image_path = "images/camera1_9ce1aca78db14c5807a271cb154fed5b.jpg"
+    #image = cv2.imread(image_path)
+    # logger.info(video)
+    # (222, 59), (432, 3), (528, 96), (318, 198) 这个区域是为了测试,画大了的
+    test_area = [[(222, 59), (432, 3), (528, 96), (318, 198)]]
+    instance = DoorInference(human_model_path, door_model_path, person_areas=None)
+    # 返回需要门和人
+    ip = '172.19.152.231'
+    channel = '45'
+    stream = StreamCapture(ip, channel)
+    posttime = 0
+    for frame, ret in stream():
+        if not ret:continue
+        image = frame.copy()
+        result = instance(image)
+        if len(result)>0 and time.time()-posttime>30:
+            videoTime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
+            logger.info(videoTime)
+            logger.info(result)
+            for res in result:
+                cv2.rectangle(image, tuple(map(int, (res.left, res.top))), tuple(map(int, (res.right, res.bottom))), (255,0, 0), 4)
+            success, encoded_image = cv2.imencode('.jpg', image)
+            content = encoded_image.tobytes()
+            successori, encoded_imageori = cv2.imencode('.jpg',frame)
+            contentori = encoded_imageori.tobytes()
+            payload = {'channel': '45',
+                        'classIndex': '8',
+                        'ip': '172.19.152.231',
+                        'videoTime': videoTime,
+                        'videoUrl': video}
+            files = [
+                            ('file', (filename, content, 'image/jpeg')),
+                            ('oldFile', (filenameori, contentori, 'image/jpeg')),
+                        ]
+            try:
+                result = requests.post('http://172.19.145.197/open/api/operate/upload', data=payload, files=files)
+                logger.info(result)
+            except Exception:
+                logger.info('posterror')
+    logger.info("EXIT")
+    #cv2.imwrite("result/result.jpg", image)
+    
+
+"""
+1. 检测工服、安全帽等
+2. 检测人的模型
+3. 安全帽的分类
+
+
+逻辑
+1. 检测人
+2. 检测安全帽、工服、头  检测到人没有工服报警
+3. 安全帽头再分类
+"""

+ 68 - 0
logger.py

@@ -0,0 +1,68 @@
+from functools import wraps
+import os
+import datetime
+import loguru
+
+
+# 单例类的装饰器
+def singleton_class_decorator(cls):
+    """
+    装饰器,单例类的装饰器
+    """
+    _instance = {}
+
+    @wraps(cls)
+    def wrapper_class(*args, **kwargs):
+        if cls not in _instance:
+            _instance[cls] = cls(*args, **kwargs)
+        return _instance[cls]
+
+    return wrapper_class
+
+
+@singleton_class_decorator
+class Logger:
+    def __init__(self):
+        self.logger_add()
+
+    def get_project_path(self, project_path=None):
+        if project_path is None:
+            project_path = os.path.realpath('.')
+        return project_path
+
+    def get_log_path(self):
+        project_path = self.get_project_path()
+        project_log_dir = os.path.join(project_path, 'log')
+        # 统一使用 runtime.log 作为日志文件名
+        project_log_filename = 'runtime.log'  
+        project_log_path = os.path.join(project_log_dir, project_log_filename)
+        return project_log_path
+
+    def logger_add(self):
+        loguru.logger.add(
+            sink=self.get_log_path(),
+            rotation="50 MB",  # 每个日志文件最大10MB
+            retention=5,  # 最多保留5个日志文件
+            compression=None,
+            encoding="utf-8",
+            enqueue=True
+        )
+
+    @property
+    def get_logger(self):
+        return loguru.logger
+
+
+# 实例化日志类
+logger = Logger().get_logger
+
+
+if __name__ == '__main__':
+    logger.debug('调试代码')
+    logger.info('输出信息')
+    logger.success('输出成功')
+    logger.warning('错误警告')
+    logger.error('代码错误')
+    logger.critical('崩溃输出')
+
+    logger.info('----原始测试----')

+ 68 - 0
stream.py

@@ -0,0 +1,68 @@
+import os
+import cv2
+import time
+import requests
+from logger import logger
+
+
+class StreamCapture:
+    def __init__(self, ip, channel, reconnect_interval=5):
+        self.channel            = channel
+        self.ip                 = ip
+        self.reconnect_interval = reconnect_interval
+        self.cap                = None
+
+    def get_stream_url(self):
+        data = {
+            "channel": str(self.channel),
+            "ip":self.ip
+        }
+        url = requests.post(url='http://172.19.152.231/open/api/operate/previewURLs',data=data).json()['msg']
+        return url
+    
+    def get_frame(self):
+        if self.cap is None or not self.cap.isOpened():
+            return None, False
+        ret, frame = self.cap.read()
+
+        if not ret:
+            return None, False
+        return frame, ret
+
+    def reconnect(self):
+        stream_url = self.get_stream_url()
+        logger.info(f"Reconnecting... {stream_url}")
+        self.cap.release()
+        self.cap = cv2.VideoCapture(stream_url)
+
+        if not self.cap.isOpened():
+            logger.info("Reconnect failed")
+            return False
+        logger.info("Reconnect success")
+        return True
+
+    def __call__(self):
+        stream_url = self.get_stream_url()
+        logger.info(f"Connecting... {stream_url}")
+        self.cap = cv2.VideoCapture(stream_url)
+        if not self.cap.isOpened():
+            logger.info("Connect failed")
+            yield None, False
+        logger.info("Connect sucess")
+        
+        while True:
+            frame, sucess = self.get_frame()
+            if not sucess:
+                logger.info("Disconnect...")
+                while not self.reconnect():
+                    time.sleep(self.reconnect_interval)
+            yield frame, sucess
+        self.cap.release()
+
+if __name__ == "__main__":
+    stream = StreamCapture()
+    for frame, ret in stream():
+        if not ret:
+            print("Cannot Get Image")
+            continue
+