test.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. '''
  2. @File : infer.py
  3. @Time : 2024/10/17 11:18:00
  4. @Author : leon
  5. @Version : 1.0
  6. @Desc : None
  7. '''
  8. import cv2
  9. import torch
  10. import time
  11. import numpy as np
  12. from ultralytics import YOLO
  13. from shapely.geometry.polygon import Polygon
  14. from collections import namedtuple
  15. from typing import List, Tuple
  16. import requests
  17. from logger import logger
  18. from stream import StreamCapture
  19. Box = namedtuple("Box", ["left", "top", "right", "bottom", "confidence", "label"])
  20. class PerspectiveMatrix:
  21. def __init__(self, matrix: np.ndarray, target: Tuple[int, int]) -> None:
  22. self.matrix = matrix
  23. self.target = target
  24. def __repr__(self):
  25. matrix_str = np.array2string(self.matrix, formatter={'float_kind': lambda x: f"{x:.2f}"})
  26. return f"PerspectiveMatrix(matrix={matrix_str}, target={self.target})"
  27. @staticmethod
  28. def perspective_matrix(src: List[Tuple[int, int]], dst: List[Tuple[int, int]], target : Tuple[int, int]) -> "PerspectiveMatrix":
  29. """
  30. 计算透视变换矩阵。
  31. 参数:
  32. - src: 源图像上的 4 个点 [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]
  33. - dst: 目标图像上的 4 个点 [(u1, v1), (u2, v2), (u3, v3), (u4, v4)]
  34. 返回:
  35. - PerspectiveMatrix: 透视变换矩阵
  36. """
  37. src_pts = np.array(src, dtype=np.float32)
  38. dst_pts = np.array(dst, dtype=np.float32)
  39. # 计算透视变换矩阵
  40. # matrix = cv2.getPerspectiveTransform(src_pts, dst_pts)
  41. # 单应性矩阵
  42. matrix, _ = cv2.findHomography(src_pts, dst_pts)
  43. return PerspectiveMatrix(matrix=matrix, target=target)
  44. class ImageTransformer:
  45. @staticmethod
  46. def PerspectiveTransform(
  47. image: np.ndarray,
  48. perspective_matrix: PerspectiveMatrix,
  49. flags=cv2.INTER_LINEAR,
  50. borderMode=cv2.BORDER_CONSTANT,
  51. borderValue=(114, 114, 114)
  52. ) -> np.ndarray:
  53. if image is None or perspective_matrix is None:
  54. raise ValueError("Input image and warpaffine_matrix cannot be None.")
  55. transformed_image = cv2.warpPerspective(
  56. image,
  57. perspective_matrix.matrix,
  58. perspective_matrix.target,
  59. flags=flags,
  60. borderMode=borderMode,
  61. borderValue=borderValue)
  62. return transformed_image
  63. class AreaManage(object):
  64. def __init__(self):
  65. self.door_areas = [
  66. [(934, 143), (958, 130), (961, 165), (936, 181)],
  67. [(564, 399), (682, 432), (684, 528), (574, 493)]]
  68. self.door_perspective_matrix = []
  69. self.dst_door_points = [(0,0), (224, 0), (224, 224), (0, 224)]
  70. for p in self.door_areas:
  71. perspective_matrix = PerspectiveMatrix.perspective_matrix(p, self.dst_door_points, (224, 224))
  72. self.door_perspective_matrix.append(perspective_matrix)
  73. # 预设值的三个门对应的人的活动区域
  74. self.person_areas = [
  75. [(900, 152), (914, 84), (637, 20), (604, 73)],
  76. [(860, 200), (958, 226), (687, 432), (586, 395)]]
  77. self.person_areas_polygon = [Polygon(area) for area in self.person_areas]
  78. def update(self, person_area):
  79. areas = []
  80. person_area_polygon = Polygon(person_area)
  81. for pap in self.person_areas_polygon:
  82. areas.append(person_area_polygon.intersection(pap).area)
  83. max_area = max(areas)
  84. if max_area > 0:
  85. max_idx = areas.index(max_area)
  86. self.person_areas_polygon[max_idx] = person_area_polygon
  87. def inner_area(self, person_polygon):
  88. for i in range(len(self.person_areas_polygon)):
  89. if person_polygon.intersection(self.person_areas_polygon[i]).area / person_polygon.area > 0.5:
  90. return i
  91. return -1
  92. """
  93. 1. 人员检测
  94. 2. 检查区域内是否有人
  95. 3. 如果区域内有人,分类该区域的门是否关闭
  96. 4. 如果关闭,上报违章
  97. """
  98. class DoorInference(object):
  99. """
  100. human_model_path : 检测人的模型地址
  101. door_model_path : 门分类的模型地址
  102. person_areas : 电子围栏区域 [[(x,y),(x,y),(x,y),(x,y),...],[(x,y),(x,y),(x,y),(x,y),...],...]
  103. device_id : 显卡id, -1表示使用cpu
  104. confidence_threshold : 检测人的模型的阈值
  105. """
  106. def __init__(self, human_model_path, door_model_path, person_areas, device_id=0, confidence_threshold= 0.5) -> "DoorInference":
  107. self.device = torch.device(f"cuda:{device_id}" if torch.cuda.is_available() and device_id !=-1 else "cpu")
  108. self.confidence_threshold = confidence_threshold
  109. self.human_model = YOLO(human_model_path).to(self.device)
  110. self.door_model = YOLO(door_model_path).to(self.device)
  111. self.door_names = {0: 'block', 1: 'close', 2: 'open'}
  112. self.am = AreaManage()
  113. if person_areas:
  114. for person_area in person_areas:
  115. self.am.update(person_area)
  116. def __repr__(self):
  117. infer_str = f"DoorInference(device = {self.device})"
  118. return infer_str
  119. def rect(self, point_list):
  120. """
  121. 根据给定的点计算矩形框
  122. :param point_list: [(x1, y1), (x2, y2), ..., (xn, yn)] 多个点
  123. :return: 左上角和右下角的坐标,表示矩形的框
  124. """
  125. # 获取所有点的 x 和 y 坐标的最小和最大值
  126. min_x = min(point[0] for point in point_list)
  127. max_x = max(point[0] for point in point_list)
  128. min_y = min(point[1] for point in point_list)
  129. max_y = max(point[1] for point in point_list)
  130. # 计算矩形框的左上角和右下角
  131. left, top = min_x, min_y
  132. right, bottom = max_x, max_y
  133. return left, top ,right, bottom
  134. """
  135. 返回所有检测到的人
  136. """
  137. def person_detect(self, image):
  138. objs = []
  139. confs = []
  140. results = self.human_model(image, stream=False, classes=[4], conf=self.confidence_threshold, iou=0.3, imgsz=640)
  141. for result in results:
  142. boxes = result.boxes.cpu()
  143. for box in boxes:
  144. conf = box.conf.cpu().numpy().tolist()[0]
  145. left, top, right, bottom = box.xyxy.tolist()[0]
  146. objs.append([(left, top), (right, top), (right, bottom), (left, bottom)])
  147. confs.append(conf)
  148. return objs, confs
  149. """
  150. 判断门是否关闭
  151. """
  152. def is_door_close(self, image, index) -> bool:
  153. # 图片透视变换, 只需要门的区域
  154. # 比只使用矩形框更大程度的只保留门的区域
  155. door_image = ImageTransformer.PerspectiveTransform(image, self.am.door_perspective_matrix[index])
  156. # cv2.imshow('0',door_image)
  157. # cv2.waitKey(30000)
  158. res = self.door_model(door_image)
  159. class_idx = res[0].probs.top1
  160. class_conf = res[0].probs.top1conf.cpu().numpy()
  161. return class_idx == 1, class_conf
  162. """
  163. image 输入待识别图片
  164. 返回需要画框的门和人的坐标
  165. """
  166. def __call__(self, image):
  167. inner_person = {0:[], 1:[], 2:[]}
  168. person_boxes, person_confs = self.person_detect(image)
  169. for person_box, person_conf in zip(person_boxes, person_confs):
  170. person_polygon = Polygon(person_box)
  171. idx = self.am.inner_area(person_polygon)
  172. if idx == -1: continue
  173. inner_person[idx].append({"box" : person_box, "conf" : person_conf})
  174. result = []
  175. for i, persons in inner_person.items():
  176. if len(persons) == 0:
  177. continue
  178. close, class_conf = self.is_door_close(image, i)
  179. if close:
  180. left, top ,right, bottom = self.rect(self.am.door_areas[i])
  181. result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=class_conf, label="door_close"))
  182. for person in persons:
  183. left, top ,right, bottom = self.rect(person["box"])
  184. conf = person["conf"]
  185. result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=conf, label="person"))
  186. return result
  187. if __name__ == "__main__":
  188. logger.info("====== Start Server =======")
  189. human_model_path = "models/work_clo_person_head_hat.pt"
  190. door_model_path = "models/door_classify.pt"
  191. #image_path = "images/camera1_9ce1aca78db14c5807a271cb154fed5b.jpg"
  192. #image = cv2.imread(image_path)
  193. # logger.info(video)
  194. # (222, 59), (432, 3), (528, 96), (318, 198) 这个区域是为了测试,画大了的
  195. test_area = [[(222, 59), (432, 3), (528, 96), (318, 198)]]
  196. instance = DoorInference(human_model_path, door_model_path, person_areas=None)
  197. # 返回需要门和人
  198. ip = '172.19.152.231'
  199. channel = '45'
  200. stream = StreamCapture(ip, channel)
  201. posttime = time.time() - 30
  202. frame = cv2.imread("images/test.jpg")
  203. image = frame.copy()
  204. result = instance(image)
  205. if len(result) > 0 and time.time() - posttime > 30:
  206. try:
  207. posttime = time.time()
  208. videoTime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
  209. fileTime = time.strftime('%Y-%m-%d-%H:%M:%S',time.localtime())
  210. filename = fileTime + ".jpg"
  211. filenameori = fileTime + "det.jpg"
  212. logger.info(videoTime)
  213. logger.info(result)
  214. for res in result:
  215. cv2.rectangle(image, tuple(map(int, (res.left, res.top))), tuple(map(int, (res.right, res.bottom))), (255,0, 0), 4)
  216. success, encoded_image = cv2.imencode('.jpg', image)
  217. content = encoded_image.tobytes()
  218. successori, encoded_imageori = cv2.imencode('.jpg',frame)
  219. contentori = encoded_imageori.tobytes()
  220. payload = {'channel': '45',
  221. 'classIndex': '8',
  222. 'ip': '172.19.152.231',
  223. 'videoTime': videoTime,
  224. 'videoUrl': stream.stream_url}
  225. files = [
  226. ('file', (filename, content, 'image/jpeg')),
  227. ('oldFile', (filenameori, contentori, 'image/jpeg')),
  228. ]
  229. result = requests.post('http://172.19.145.197/open/api/operate/upload', data=payload, files=files)
  230. logger.info(result)
  231. except Exception as error:
  232. logger.error('Error : ', str(error))
  233. logger.info("======= EXIT =======")
  234. #cv2.imwrite("result/result.jpg", image)