123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- '''
- @File : infer.py
- @Time : 2024/10/17 11:18:00
- @Author : leon
- @Version : 1.0
- @Desc : None
- '''
- import cv2
- import torch
- import numpy as np
- from ultralytics import YOLO
- from shapely.geometry.polygon import Polygon
- from collections import namedtuple
- from typing import List, Tuple
- Box = namedtuple("Box", ["left", "top", "right", "bottom", "confidence", "label"])
- class PerspectiveMatrix:
- def __init__(self, matrix: np.ndarray, target: Tuple[int, int]) -> None:
- self.matrix = matrix
- self.target = target
-
- def __repr__(self):
- matrix_str = np.array2string(self.matrix, formatter={'float_kind': lambda x: f"{x:.2f}"})
- return f"PerspectiveMatrix(matrix={matrix_str}, target={self.target})"
-
- @staticmethod
- def perspective_matrix(src: List[Tuple[int, int]], dst: List[Tuple[int, int]], target : Tuple[int, int]) -> "PerspectiveMatrix":
- """
- 计算透视变换矩阵。
-
- 参数:
- - src: 源图像上的 4 个点 [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]
- - dst: 目标图像上的 4 个点 [(u1, v1), (u2, v2), (u3, v3), (u4, v4)]
-
- 返回:
- - PerspectiveMatrix: 透视变换矩阵
- """
- src_pts = np.array(src, dtype=np.float32)
- dst_pts = np.array(dst, dtype=np.float32)
- # 计算透视变换矩阵
- # matrix = cv2.getPerspectiveTransform(src_pts, dst_pts)
- # 单应性矩阵
- matrix, _ = cv2.findHomography(src_pts, dst_pts)
- return PerspectiveMatrix(matrix=matrix, target=target)
- class ImageTransformer:
- @staticmethod
- def PerspectiveTransform(
- image: np.ndarray,
- perspective_matrix: PerspectiveMatrix,
- flags=cv2.INTER_LINEAR,
- borderMode=cv2.BORDER_CONSTANT,
- borderValue=(114, 114, 114)
- ) -> np.ndarray:
- if image is None or perspective_matrix is None:
- raise ValueError("Input image and warpaffine_matrix cannot be None.")
- transformed_image = cv2.warpPerspective(
- image,
- perspective_matrix.matrix,
- perspective_matrix.target,
- flags=flags,
- borderMode=borderMode,
- borderValue=borderValue)
- return transformed_image
- class AreaManage(object):
- def __init__(self):
- self.door_areas = [
- # [(934, 143), (958, 130), (961, 165), (936, 181)],
- [(564, 399), (682, 432), (684, 528), (574, 493)]]
-
- self.door_perspective_matrix = []
- self.dst_door_points = [(0,0), (224, 0), (224, 224), (0, 224)]
- for p in self.door_areas:
- perspective_matrix = PerspectiveMatrix.perspective_matrix(p, self.dst_door_points, (224, 224))
- self.door_perspective_matrix.append(perspective_matrix)
- # 预设值的三个门对应的人的活动区域
- self.person_areas = [
- # [(900, 152), (914, 84), (637, 20), (604, 73)],
- [(860, 200), (958, 226), (687, 432), (586, 395)]]
-
- self.person_areas_polygon = [Polygon(area) for area in self.person_areas]
-
- def update(self, person_area):
- areas = []
- person_area_polygon = Polygon(person_area)
- for pap in self.person_areas_polygon:
- areas.append(person_area_polygon.intersection(pap).area)
- max_area = max(areas)
- if max_area > 0:
- max_idx = areas.index(max_area)
- self.person_areas_polygon[max_idx] = person_area_polygon
-
- def inner_area(self, person_polygon):
- for i in range(len(self.person_areas_polygon)):
- overlap_ratop = person_polygon.intersection(self.person_areas_polygon[i]).area / person_polygon.area
- if overlap_ratop > 0.4:
- return i
- return -1
-
- """
- 1. 人员检测
- 2. 检查区域内是否有人
- 3. 如果区域内有人,分类该区域的门是否关闭
- 4. 如果关闭,上报违章
- """
- class DoorInference(object):
- """
- human_model_path : 检测人的模型地址
- door_model_path : 门分类的模型地址
- person_areas : 电子围栏区域 [[(x,y),(x,y),(x,y),(x,y),...],[(x,y),(x,y),(x,y),(x,y),...],...]
- device_id : 显卡id, -1表示使用cpu
- confidence_threshold : 检测人的模型的阈值
- """
- def __init__(self, human_model_path, door_model_path, person_areas, device_id=0, confidence_threshold= 0.5) -> "DoorInference":
- self.device = torch.device(f"cuda:{device_id}" if torch.cuda.is_available() and device_id !=-1 else "cpu")
- self.confidence_threshold = confidence_threshold
- self.human_model = YOLO(human_model_path).to(self.device)
- self.door_model = YOLO(door_model_path).to(self.device)
- self.door_names = {0: 'block', 1: 'close', 2: 'open'}
- self.am = AreaManage()
- if person_areas:
- for person_area in person_areas:
- self.am.update(person_area)
- def __repr__(self):
- infer_str = f"DoorInference(device = {self.device})"
- return infer_str
-
- def rect(self, point_list):
- """
- 根据给定的点计算矩形框
- :param point_list: [(x1, y1), (x2, y2), ..., (xn, yn)] 多个点
- :return: 左上角和右下角的坐标,表示矩形的框
- """
- # 获取所有点的 x 和 y 坐标的最小和最大值
- min_x = min(point[0] for point in point_list)
- max_x = max(point[0] for point in point_list)
- min_y = min(point[1] for point in point_list)
- max_y = max(point[1] for point in point_list)
- # 计算矩形框的左上角和右下角
- left, top = min_x, min_y
- right, bottom = max_x, max_y
- return left, top ,right, bottom
- """
- 返回所有检测到的人
- """
- def person_detect(self, image):
- objs = []
- confs = []
- results = self.human_model(image, stream=False, classes=[0], conf=self.confidence_threshold, iou=0.3, imgsz=640)
- for result in results:
- boxes = result.boxes.cpu()
- for box in boxes:
- conf = box.conf.cpu().numpy().tolist()[0]
- left, top, right, bottom = box.xyxy.tolist()[0]
- objs.append([(left, top), (right, top), (right, bottom), (left, bottom)])
- confs.append(conf)
- return objs, confs
- """
- 判断门是否关闭
- """
- def is_door_close(self, image, index) -> bool:
- # 图片透视变换, 只需要门的区域
- # 比只使用矩形框更大程度的只保留门的区域
- door_image = ImageTransformer.PerspectiveTransform(image, self.am.door_perspective_matrix[index])
- # cv2.imshow('0',door_image)
- # cv2.waitKey(30000)
- res = self.door_model(door_image)
- class_idx = res[0].probs.top1
- class_conf = res[0].probs.top1conf.cpu().numpy().item()
- return class_idx == 1, class_conf
- """
- image 输入待识别图片
- 返回需要画框的门和人的坐标
- """
- def __call__(self, image):
- inner_person = {0:[], 1:[], 2:[]}
- person_boxes, person_confs = self.person_detect(image)
- for person_box, person_conf in zip(person_boxes, person_confs):
- person_polygon = Polygon(person_box)
- idx = self.am.inner_area(person_polygon)
- if idx == -1: continue
- inner_person[idx].append({"box" : person_box, "conf" : person_conf})
- result = []
- for i, persons in inner_person.items():
- if len(persons) == 0:
- continue
- close, class_conf = self.is_door_close(image, i)
- if close:
- left, top ,right, bottom = self.rect(self.am.door_areas[i])
- result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=class_conf, label="door_close"))
- for person in persons:
- left, top ,right, bottom = self.rect(person["box"])
- conf = person["conf"]
- result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=conf, label="person"))
- return result
|