123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- '''
- @File : infer.py
- @Time : 2024/10/17 11:18:00
- @Author : leon
- @Version : 1.0
- @Desc : None
- '''
- import cv2
- import torch
- import time
- import numpy as np
- from ultralytics import YOLO
- from shapely.geometry.polygon import Polygon
- from collections import namedtuple
- from typing import List, Tuple
- import requests
- from logger import logger
- from stream import StreamCapture
- Box = namedtuple("Box", ["left", "top", "right", "bottom", "confidence", "label"])
- class PerspectiveMatrix:
- def __init__(self, matrix: np.ndarray, target: Tuple[int, int]) -> None:
- self.matrix = matrix
- self.target = target
-
- def __repr__(self):
- matrix_str = np.array2string(self.matrix, formatter={'float_kind': lambda x: f"{x:.2f}"})
- return f"PerspectiveMatrix(matrix={matrix_str}, target={self.target})"
-
- @staticmethod
- def perspective_matrix(src: List[Tuple[int, int]], dst: List[Tuple[int, int]], target : Tuple[int, int]) -> "PerspectiveMatrix":
- """
- 计算透视变换矩阵。
-
- 参数:
- - src: 源图像上的 4 个点 [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]
- - dst: 目标图像上的 4 个点 [(u1, v1), (u2, v2), (u3, v3), (u4, v4)]
-
- 返回:
- - PerspectiveMatrix: 透视变换矩阵
- """
- src_pts = np.array(src, dtype=np.float32)
- dst_pts = np.array(dst, dtype=np.float32)
- # 计算透视变换矩阵
- # matrix = cv2.getPerspectiveTransform(src_pts, dst_pts)
- # 单应性矩阵
- matrix, _ = cv2.findHomography(src_pts, dst_pts)
- return PerspectiveMatrix(matrix=matrix, target=target)
- class ImageTransformer:
- @staticmethod
- def PerspectiveTransform(
- image: np.ndarray,
- perspective_matrix: PerspectiveMatrix,
- flags=cv2.INTER_LINEAR,
- borderMode=cv2.BORDER_CONSTANT,
- borderValue=(114, 114, 114)
- ) -> np.ndarray:
- if image is None or perspective_matrix is None:
- raise ValueError("Input image and warpaffine_matrix cannot be None.")
- transformed_image = cv2.warpPerspective(
- image,
- perspective_matrix.matrix,
- perspective_matrix.target,
- flags=flags,
- borderMode=borderMode,
- borderValue=borderValue)
- return transformed_image
- class AreaManage(object):
- def __init__(self):
- self.door_areas = [
- [(934, 143), (958, 130), (961, 165), (936, 181)],
- [(564, 399), (682, 432), (684, 528), (574, 493)]]
-
- self.door_perspective_matrix = []
- self.dst_door_points = [(0,0), (224, 0), (224, 224), (0, 224)]
- for p in self.door_areas:
- perspective_matrix = PerspectiveMatrix.perspective_matrix(p, self.dst_door_points, (224, 224))
- self.door_perspective_matrix.append(perspective_matrix)
- # 预设值的三个门对应的人的活动区域
- self.person_areas = [
- [(900, 152), (914, 84), (637, 20), (604, 73)],
- [(860, 200), (958, 226), (687, 432), (586, 395)]]
-
- self.person_areas_polygon = [Polygon(area) for area in self.person_areas]
-
- def update(self, person_area):
- areas = []
- person_area_polygon = Polygon(person_area)
- for pap in self.person_areas_polygon:
- areas.append(person_area_polygon.intersection(pap).area)
- max_area = max(areas)
- if max_area > 0:
- max_idx = areas.index(max_area)
- self.person_areas_polygon[max_idx] = person_area_polygon
-
- def inner_area(self, person_polygon):
- for i in range(len(self.person_areas_polygon)):
- if person_polygon.intersection(self.person_areas_polygon[i]).area / person_polygon.area > 0.5:
- return i
- return -1
-
- """
- 1. 人员检测
- 2. 检查区域内是否有人
- 3. 如果区域内有人,分类该区域的门是否关闭
- 4. 如果关闭,上报违章
- """
- class DoorInference(object):
- """
- human_model_path : 检测人的模型地址
- door_model_path : 门分类的模型地址
- person_areas : 电子围栏区域 [[(x,y),(x,y),(x,y),(x,y),...],[(x,y),(x,y),(x,y),(x,y),...],...]
- device_id : 显卡id, -1表示使用cpu
- confidence_threshold : 检测人的模型的阈值
- """
- def __init__(self, human_model_path, door_model_path, person_areas, device_id=0, confidence_threshold= 0.5) -> "DoorInference":
- self.device = torch.device(f"cuda:{device_id}" if torch.cuda.is_available() and device_id !=-1 else "cpu")
- self.confidence_threshold = confidence_threshold
- self.human_model = YOLO(human_model_path).to(self.device)
- self.door_model = YOLO(door_model_path).to(self.device)
- self.door_names = {0: 'block', 1: 'close', 2: 'open'}
- self.am = AreaManage()
- if person_areas:
- for person_area in person_areas:
- self.am.update(person_area)
- def __repr__(self):
- infer_str = f"DoorInference(device = {self.device})"
- return infer_str
-
- def rect(self, point_list):
- """
- 根据给定的点计算矩形框
- :param point_list: [(x1, y1), (x2, y2), ..., (xn, yn)] 多个点
- :return: 左上角和右下角的坐标,表示矩形的框
- """
- # 获取所有点的 x 和 y 坐标的最小和最大值
- min_x = min(point[0] for point in point_list)
- max_x = max(point[0] for point in point_list)
- min_y = min(point[1] for point in point_list)
- max_y = max(point[1] for point in point_list)
- # 计算矩形框的左上角和右下角
- left, top = min_x, min_y
- right, bottom = max_x, max_y
- return left, top ,right, bottom
- """
- 返回所有检测到的人
- """
- def person_detect(self, image):
- objs = []
- confs = []
- results = self.human_model(image, stream=False, classes=[4], conf=self.confidence_threshold, iou=0.3, imgsz=640)
- for result in results:
- boxes = result.boxes.cpu()
- for box in boxes:
- conf = box.conf.cpu().numpy().tolist()[0]
- left, top, right, bottom = box.xyxy.tolist()[0]
- objs.append([(left, top), (right, top), (right, bottom), (left, bottom)])
- confs.append(conf)
- return objs, confs
- """
- 判断门是否关闭
- """
- def is_door_close(self, image, index) -> bool:
- # 图片透视变换, 只需要门的区域
- # 比只使用矩形框更大程度的只保留门的区域
- door_image = ImageTransformer.PerspectiveTransform(image, self.am.door_perspective_matrix[index])
- # cv2.imshow('0',door_image)
- # cv2.waitKey(30000)
- res = self.door_model(door_image)
- class_idx = res[0].probs.top1
- class_conf = res[0].probs.top1conf.cpu().numpy()
- return class_idx == 1, class_conf
- """
- image 输入待识别图片
- 返回需要画框的门和人的坐标
- """
- def __call__(self, image):
- inner_person = {0:[], 1:[], 2:[]}
- person_boxes, person_confs = self.person_detect(image)
-
- for person_box, person_conf in zip(person_boxes, person_confs):
- person_polygon = Polygon(person_box)
- idx = self.am.inner_area(person_polygon)
- if idx == -1: continue
- inner_person[idx].append({"box" : person_box, "conf" : person_conf})
- result = []
- for i, persons in inner_person.items():
- if len(persons) == 0:
- continue
- close, class_conf = self.is_door_close(image, i)
- if close:
- left, top ,right, bottom = self.rect(self.am.door_areas[i])
- result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=class_conf, label="door_close"))
- for person in persons:
- left, top ,right, bottom = self.rect(person["box"])
- conf = person["conf"]
- result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=conf, label="person"))
- return result
-
-
- if __name__ == "__main__":
- logger.info("====== Start Server =======")
- human_model_path = "models/work_clo_person_head_hat.pt"
- door_model_path = "models/door_classify.pt"
- #image_path = "images/camera1_9ce1aca78db14c5807a271cb154fed5b.jpg"
- #image = cv2.imread(image_path)
- # logger.info(video)
- # (222, 59), (432, 3), (528, 96), (318, 198) 这个区域是为了测试,画大了的
- test_area = [[(222, 59), (432, 3), (528, 96), (318, 198)]]
- instance = DoorInference(human_model_path, door_model_path, person_areas=None)
- # 返回需要门和人
- ip = '172.19.152.231'
- channel = '45'
- stream = StreamCapture(ip, channel)
- posttime = time.time() - 30
- frame = cv2.imread("inference/test.jpg")
- image = frame.copy()
- result = instance(image)
- if len(result) > 0 and time.time() - posttime > 30:
- try:
- posttime = time.time()
- videoTime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
- fileTime = time.strftime('%Y-%m-%d-%H:%M:%S',time.localtime())
- filename = fileTime + ".jpg"
- filenameori = fileTime + "det.jpg"
- logger.info(videoTime)
- logger.info(result)
- for res in result:
- cv2.rectangle(image, tuple(map(int, (res.left, res.top))), tuple(map(int, (res.right, res.bottom))), (255,0, 0), 4)
- success, encoded_image = cv2.imencode('.jpg', image)
- content = encoded_image.tobytes()
- successori, encoded_imageori = cv2.imencode('.jpg',frame)
- contentori = encoded_imageori.tobytes()
- payload = {'channel': '45',
- 'classIndex': '8',
- 'ip': '172.19.152.231',
- 'videoTime': videoTime,
- 'videoUrl': stream.stream_url}
- files = [
- ('file', (filename, content, 'image/jpeg')),
- ('oldFile', (filenameori, contentori, 'image/jpeg')),
- ]
-
- result = requests.post('http://172.19.145.197/open/api/operate/upload', data=payload, files=files)
- logger.info(result)
- except Exception as error:
- logger.error('Error : ', str(error))
- logger.info("======= EXIT =======")
- #cv2.imwrite("result/result.jpg", image)
|