infer.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. '''
  2. @File : infer.py
  3. @Time : 2024/10/17 11:18:00
  4. @Author : leon
  5. @Version : 1.0
  6. @Desc : None
  7. '''
  8. import cv2
  9. import torch
  10. import numpy as np
  11. from ultralytics import YOLO
  12. from shapely.geometry.polygon import Polygon
  13. from collections import namedtuple
  14. from typing import List, Tuple
  15. import requests
  16. from logger import logger
  17. from stream import StreamCapture
  18. Box = namedtuple("Box", ["left", "top", "right", "bottom", "confidence", "label"])
  19. class PerspectiveMatrix:
  20. def __init__(self, matrix: np.ndarray, target: Tuple[int, int]) -> None:
  21. self.matrix = matrix
  22. self.target = target
  23. def __repr__(self):
  24. matrix_str = np.array2string(self.matrix, formatter={'float_kind': lambda x: f"{x:.2f}"})
  25. return f"PerspectiveMatrix(matrix={matrix_str}, target={self.target})"
  26. @staticmethod
  27. def perspective_matrix(src: List[Tuple[int, int]], dst: List[Tuple[int, int]], target : Tuple[int, int]) -> "PerspectiveMatrix":
  28. """
  29. 计算透视变换矩阵。
  30. 参数:
  31. - src: 源图像上的 4 个点 [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]
  32. - dst: 目标图像上的 4 个点 [(u1, v1), (u2, v2), (u3, v3), (u4, v4)]
  33. 返回:
  34. - PerspectiveMatrix: 透视变换矩阵
  35. """
  36. src_pts = np.array(src, dtype=np.float32)
  37. dst_pts = np.array(dst, dtype=np.float32)
  38. # 计算透视变换矩阵
  39. # matrix = cv2.getPerspectiveTransform(src_pts, dst_pts)
  40. # 单应性矩阵
  41. matrix, _ = cv2.findHomography(src_pts, dst_pts)
  42. return PerspectiveMatrix(matrix=matrix, target=target)
  43. class ImageTransformer:
  44. @staticmethod
  45. def PerspectiveTransform(
  46. image: np.ndarray,
  47. perspective_matrix: PerspectiveMatrix,
  48. flags=cv2.INTER_LINEAR,
  49. borderMode=cv2.BORDER_CONSTANT,
  50. borderValue=(114, 114, 114)
  51. ) -> np.ndarray:
  52. if image is None or perspective_matrix is None:
  53. raise ValueError("Input image and warpaffine_matrix cannot be None.")
  54. transformed_image = cv2.warpPerspective(
  55. image,
  56. perspective_matrix.matrix,
  57. perspective_matrix.target,
  58. flags=flags,
  59. borderMode=borderMode,
  60. borderValue=borderValue)
  61. return transformed_image
  62. class AreaManage(object):
  63. def __init__(self):
  64. self.door_areas = [
  65. [(934, 143), (958, 130), (961, 165), (936, 181)],
  66. [(564, 399), (682, 432), (684, 528), (574, 493)]]
  67. self.door_perspective_matrix = []
  68. self.dst_door_points = [(0,0), (224, 0), (224, 224), (0, 224)]
  69. for p in self.door_areas:
  70. perspective_matrix = PerspectiveMatrix.perspective_matrix(p, self.dst_door_points, (224, 224))
  71. self.door_perspective_matrix.append(perspective_matrix)
  72. # 预设值的三个门对应的人的活动区域
  73. self.person_areas = [
  74. [(900, 152), (914, 84), (637, 20), (604, 73)],
  75. [(860, 200), (958, 226), (687, 432), (586, 395)]]
  76. self.person_areas_polygon = [Polygon(area) for area in self.person_areas]
  77. def update(self, person_area):
  78. areas = []
  79. person_area_polygon = Polygon(person_area)
  80. for pap in self.person_areas_polygon:
  81. areas.append(person_area_polygon.intersection(pap).area)
  82. max_area = max(areas)
  83. if max_area > 0:
  84. max_idx = areas.index(max_area)
  85. self.person_areas_polygon[max_idx] = person_area_polygon
  86. def inner_area(self, person_polygon):
  87. for i in range(len(self.person_areas_polygon)):
  88. if person_polygon.intersection(self.person_areas_polygon[i]).area / person_polygon.area > 0.5:
  89. return i
  90. return -1
  91. """
  92. 1. 人员检测
  93. 2. 检查区域内是否有人
  94. 3. 如果区域内有人,分类该区域的门是否关闭
  95. 4. 如果关闭,上报违章
  96. """
  97. class DoorInference(object):
  98. """
  99. human_model_path : 检测人的模型地址
  100. door_model_path : 门分类的模型地址
  101. person_areas : 电子围栏区域 [[(x,y),(x,y),(x,y),(x,y),...],[(x,y),(x,y),(x,y),(x,y),...],...]
  102. device_id : 显卡id, -1表示使用cpu
  103. confidence_threshold : 检测人的模型的阈值
  104. """
  105. def __init__(self, human_model_path, door_model_path, person_areas, device_id=0, confidence_threshold= 0.5) -> "DoorInference":
  106. self.device = torch.device(f"cuda:{device_id}" if torch.cuda.is_available() and device_id !=-1 else "cpu")
  107. self.confidence_threshold = confidence_threshold
  108. self.human_model = YOLO(human_model_path).to(self.device)
  109. self.door_model = YOLO(door_model_path).to(self.device)
  110. self.door_names = {0: 'block', 1: 'close', 2: 'open'}
  111. self.am = AreaManage()
  112. if person_areas:
  113. for person_area in person_areas:
  114. self.am.update(person_area)
  115. def __repr__(self):
  116. infer_str = f"DoorInference(device = {self.device})"
  117. return infer_str
  118. def rect(self, point_list):
  119. """
  120. 根据给定的点计算矩形框
  121. :param point_list: [(x1, y1), (x2, y2), ..., (xn, yn)] 多个点
  122. :return: 左上角和右下角的坐标,表示矩形的框
  123. """
  124. # 获取所有点的 x 和 y 坐标的最小和最大值
  125. min_x = min(point[0] for point in point_list)
  126. max_x = max(point[0] for point in point_list)
  127. min_y = min(point[1] for point in point_list)
  128. max_y = max(point[1] for point in point_list)
  129. # 计算矩形框的左上角和右下角
  130. left, top = min_x, min_y
  131. right, bottom = max_x, max_y
  132. return left, top ,right, bottom
  133. """
  134. 返回所有检测到的人
  135. """
  136. def person_detect(self, image):
  137. objs = []
  138. confs = []
  139. results = self.human_model(image, stream=False, classes=[4], conf=self.confidence_threshold, iou=0.3, imgsz=640)
  140. for result in results:
  141. boxes = result.boxes.cpu()
  142. for box in boxes:
  143. conf = box.conf.cpu().numpy().tolist()[0]
  144. left, top, right, bottom = box.xyxy.tolist()[0]
  145. objs.append([(left, top), (right, top), (right, bottom), (left, bottom)])
  146. confs.append(conf)
  147. return objs, confs
  148. """
  149. 判断门是否关闭
  150. """
  151. def is_door_close(self, image, index) -> bool:
  152. # 图片透视变换, 只需要门的区域
  153. # 比只使用矩形框更大程度的只保留门的区域
  154. door_image = ImageTransformer.PerspectiveTransform(image, self.am.door_perspective_matrix[index])
  155. # cv2.imshow('0',door_image)
  156. # cv2.waitKey(30000)
  157. res = self.door_model(door_image)
  158. class_idx = res[0].probs.top1
  159. class_conf = res[0].probs.top1conf.cpu().numpy()
  160. return class_idx == 1, class_conf
  161. """
  162. image 输入待识别图片
  163. 返回需要画框的门和人的坐标
  164. """
  165. def __call__(self, image):
  166. inner_person = {0:[], 1:[], 2:[]}
  167. person_boxes, person_confs = self.person_detect(image)
  168. for person_box, person_conf in zip(person_boxes, person_confs):
  169. person_polygon = Polygon(person_box)
  170. idx = self.am.inner_area(person_polygon)
  171. if idx == -1: continue
  172. inner_person[idx].append({"box" : person_box, "conf" : person_conf})
  173. result = []
  174. for i, persons in inner_person.items():
  175. if len(persons) == 0:
  176. continue
  177. close, class_conf = self.is_door_close(image, i)
  178. if close:
  179. left, top ,right, bottom = self.rect(self.am.door_areas[i])
  180. result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=class_conf, label="door_close"))
  181. for person in persons:
  182. left, top ,right, bottom = self.rect(person["box"])
  183. conf = person["conf"]
  184. result.append(Box(left=left, top=top, right=right, bottom=bottom, confidence=conf, label="person"))
  185. return result
  186. if __name__ == "__main__":
  187. logger.info("Start Server")
  188. human_model_path = "models/work_clo_person_head_hat.pt"
  189. door_model_path = "models/door_classify.pt"
  190. #image_path = "images/camera1_9ce1aca78db14c5807a271cb154fed5b.jpg"
  191. #image = cv2.imread(image_path)
  192. # logger.info(video)
  193. # (222, 59), (432, 3), (528, 96), (318, 198) 这个区域是为了测试,画大了的
  194. test_area = [[(222, 59), (432, 3), (528, 96), (318, 198)]]
  195. instance = DoorInference(human_model_path, door_model_path, person_areas=None)
  196. # 返回需要门和人
  197. ip = '172.19.152.231'
  198. channel = '45'
  199. stream = StreamCapture(ip, channel)
  200. posttime = 0
  201. for frame, ret in stream():
  202. if not ret:continue
  203. image = frame.copy()
  204. result = instance(image)
  205. if len(result)>0 and time.time()-posttime>30:
  206. videoTime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
  207. logger.info(videoTime)
  208. logger.info(result)
  209. for res in result:
  210. cv2.rectangle(image, tuple(map(int, (res.left, res.top))), tuple(map(int, (res.right, res.bottom))), (255,0, 0), 4)
  211. success, encoded_image = cv2.imencode('.jpg', image)
  212. content = encoded_image.tobytes()
  213. successori, encoded_imageori = cv2.imencode('.jpg',frame)
  214. contentori = encoded_imageori.tobytes()
  215. payload = {'channel': '45',
  216. 'classIndex': '8',
  217. 'ip': '172.19.152.231',
  218. 'videoTime': videoTime,
  219. 'videoUrl': video}
  220. files = [
  221. ('file', (filename, content, 'image/jpeg')),
  222. ('oldFile', (filenameori, contentori, 'image/jpeg')),
  223. ]
  224. try:
  225. result = requests.post('http://172.19.145.197/open/api/operate/upload', data=payload, files=files)
  226. logger.info(result)
  227. except Exception:
  228. logger.info('posterror')
  229. logger.info("EXIT")
  230. #cv2.imwrite("result/result.jpg", image)
  231. """
  232. 1. 检测工服、安全帽等
  233. 2. 检测人的模型
  234. 3. 安全帽的分类
  235. 逻辑
  236. 1. 检测人
  237. 2. 检测安全帽、工服、头 检测到人没有工服报警
  238. 3. 安全帽头再分类
  239. """