person_speed.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import cv2
  2. import numpy as np
  3. from ultralytics import YOLO
  4. from collections import defaultdict, deque
  5. import datetime
  6. import time
  7. import math
  8. # Load the YOLO11 model
  9. model = YOLO("yolo11m.pt")
  10. # Open the video file
  11. video_path = r"E:\desktop_file\速度标定\run.mp4"
  12. # video_path = r"E:\wx_file\WeChat Files\wxid_1lcmt2w2jdwl22\FileStorage\File\2024-11\3.4-13时胶乳包装.mp4"
  13. cap = cv2.VideoCapture(video_path)
  14. # 存储最近的200帧用于回溯
  15. frame_buffer = deque(maxlen=100) # 新增帧缓冲区
  16. # 后续帧收集任务列表,元素格式:(pre_buffer, post_buffer, 剩余帧数, 触发时间)
  17. active_tasks = []
  18. # Store the track history
  19. track_history = defaultdict(lambda: [])
  20. # 用于存储每个 track_id 最近的时间戳
  21. time_stamps = defaultdict(lambda: deque(maxlen=200)) # 固定长度为 50
  22. # 用于存储瞬时速度
  23. instantaneous_velocities = defaultdict(lambda: deque(maxlen=100))
  24. def apply_bias(position):
  25. """
  26. 偏置函数:使用 x/ln(1+x) 计算偏置
  27. 已弃用
  28. """
  29. x, y = position
  30. bias_x = np.log1p(x) if x > 0 else 0
  31. bias_y = np.log1p(y) if y > 0 else 0
  32. return np.array([bias_x, bias_y])
  33. def save_high_speed_video(buffer, trigger_time):
  34. """将缓冲区中的帧保存为MP4文件"""
  35. if len(buffer) < 1:
  36. return
  37. # 生成唯一文件名
  38. timestamp = trigger_time.strftime("%Y%m%d%H%M%S%f")
  39. output_path = f"high_speed_{timestamp}.mp4"
  40. # 使用MP4编码(需确保OpenCV支持)
  41. fourcc_mp4 = cv2.VideoWriter_fourcc(*'x264')
  42. writer = cv2.VideoWriter(output_path, fourcc_mp4, fps, (frame_width, frame_height))
  43. for frame in buffer:
  44. writer.write(frame)
  45. writer.release()
  46. def map_to_ellipse(position):
  47. x, y = position
  48. center_x = 640
  49. center_y = 360
  50. a = 580
  51. b = 280
  52. x_norm = x / 1280
  53. y_norm = y / 720
  54. d_norm = math.sqrt((x_norm - 0.5) ** 2 + (y_norm - 0.5) ** 2)
  55. theta_norm = math.atan2(y_norm - 0.5, x_norm - 0.5)
  56. f = d_norm
  57. a_new = a * f
  58. b_new = b * f
  59. bias_x = center_x + a_new * math.cos(theta_norm)
  60. bias_y = center_y + b_new * math.sin(theta_norm)
  61. return np.array([bias_x, bias_y])
  62. # 创建 VideoWriter 对象以保存输出视频
  63. fourcc = cv2.VideoWriter_fourcc(*'XVID') # 视频编码格式
  64. output_file = "output_video.avi" # 输出文件名
  65. fps = 25 # 帧率
  66. frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  67. frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  68. out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))
  69. speed_threshold = 20 # 速度阈值
  70. high_velocity_count_threshold = 20 # 高速度计数阈值
  71. # Loop through the video frames
  72. while cap.isOpened():
  73. # 记录当前时间
  74. current_time = time.time()
  75. # Read a frame from the video
  76. success, frame = cap.read()
  77. if success:
  78. # 将当前帧加入缓冲区(深拷贝避免覆盖)
  79. frame_buffer.append(frame.copy()) # 新增
  80. # 处理后续帧收集任务
  81. for i in reversed(range(len(active_tasks))):
  82. pre_buffer, post_buffer, frames_left, trigger_time = active_tasks[i]
  83. post_buffer.append(frame.copy())
  84. frames_left -= 1
  85. if frames_left <= 0:
  86. combined = pre_buffer + list(post_buffer)
  87. save_high_speed_video(combined, trigger_time)
  88. del active_tasks[i]
  89. else:
  90. active_tasks[i] = (pre_buffer, post_buffer, frames_left, trigger_time)
  91. # Run YOLO11 tracking on the frame, persisting tracks between frames
  92. results = model.track(frame, persist=True, classes=0, conf=0.6)
  93. if results[0].boxes and results[0].boxes.id is not None:
  94. # Get the boxes and track IDs
  95. boxes = results[0].boxes.xywh.cpu()
  96. track_ids = results[0].boxes.id.int().cpu().tolist()
  97. for box, track_id in zip(boxes, track_ids):
  98. x, y, w, h = box
  99. # 绘制边界框
  100. cv2.rectangle(frame, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), (0, 255, 0), 2)
  101. # 计算左下角坐标
  102. bottom_left_x = int(x - w / 2)
  103. bottom_left_y = int(y + h / 2)
  104. # 计算中心点
  105. center_x = int(x)
  106. center_y = int(y)
  107. # 绘制中心点
  108. cv2.circle(frame, (center_x, center_y), 5, (255, 0, 0), -1) # 红色中心点,半径为 5
  109. # 记录位置
  110. track_history[track_id].append((bottom_left_x, bottom_left_y))
  111. if len(track_history[track_id]) > 100:
  112. del track_history[track_id][:-50] # 维持历史长度
  113. # 记录每一帧的时间
  114. time_stamps[track_id].append(current_time)
  115. # 计算时间间隔
  116. if len(time_stamps[track_id]) > 1:
  117. delta_time = time_stamps[track_id][-1] - time_stamps[track_id][-2] # 最近两帧的时间差
  118. else:
  119. delta_time = 0
  120. instantaneous_velocity = 0
  121. # 计算二维瞬时速度
  122. if len(track_history[track_id]) >= 2:
  123. pos1 = np.array(track_history[track_id][-1]) # 最新位置
  124. pos2 = np.array(track_history[track_id][-2]) # 前一个位置
  125. pos1 = map_to_ellipse(pos1)
  126. pos2 = map_to_ellipse(pos2)
  127. distance = np.linalg.norm(pos1 - pos2)
  128. # 使用时间间隔进行速度计算
  129. instantaneous_velocity = distance / delta_time if delta_time > 0 else np.zeros(2)
  130. instantaneous_velocity_magnitude = round(np.linalg.norm(instantaneous_velocity), 1)
  131. instantaneous_velocities[track_id].append(instantaneous_velocity_magnitude)
  132. else:
  133. instantaneous_velocity_magnitude = 0
  134. # 判断是否有足够数量的高速度
  135. high_velocity_count = sum(1 for velocity in instantaneous_velocities[track_id] if velocity > speed_threshold)
  136. if high_velocity_count >= high_velocity_count_threshold:
  137. # 原逻辑:截图,标红
  138. # cv2.putText(frame, str(instantaneous_velocity_magnitude), (int(x), int(y)),
  139. # cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
  140. # data_time = str(datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
  141. # file_name = "high_speed_" + data_time + ".jpg"
  142. # cv2.imwrite(file_name, frame)
  143. # 触发保存逻辑
  144. pre_buffer = list(frame_buffer) # 深拷贝前200帧
  145. post_buffer = deque(maxlen=100)
  146. trigger_time = datetime.datetime.now()
  147. active_tasks.append((pre_buffer, post_buffer, 100, trigger_time))
  148. # 新增逻辑:删除超过 speed_threshold 的瞬时速度
  149. instantaneous_velocities[track_id] = deque(
  150. [velocity for velocity in instantaneous_velocities[track_id] if velocity <= speed_threshold],
  151. maxlen=100
  152. )
  153. # 新增保存视频逻辑
  154. data_time = datetime.datetime.now()
  155. save_high_speed_video(frame_buffer, data_time)
  156. else:
  157. cv2.putText(frame, str(instantaneous_velocity_magnitude), (int(x), int(y)),
  158. cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 121, 23), 2)
  159. # Save the annotated frame to the output video
  160. out.write(frame) # 将处理后的视频帧写入文件
  161. # Display the annotated frame
  162. cv2.imshow("YOLO11 Tracking", frame)
  163. # Break the loop if 'q' is pressed
  164. if cv2.waitKey(1) & 0xFF == ord("q"):
  165. break
  166. else:
  167. # Break the loop if the end of the video is reached
  168. break
  169. # Release the video capture object and close the display window
  170. cap.release()
  171. out.release() # 释放 VideoWriter 对象
  172. cv2.destroyAllWindows()