import cv2 import numpy as np from ultralytics import YOLO from collections import defaultdict, deque import datetime import time import math # Load the YOLO11 model model = YOLO("yolo11m.pt") # Open the video file video_path = r"E:\desktop_file\速度标定\run.mp4" # video_path = r"E:\wx_file\WeChat Files\wxid_1lcmt2w2jdwl22\FileStorage\File\2024-11\3.4-13时胶乳包装.mp4" cap = cv2.VideoCapture(video_path) # 存储最近的200帧用于回溯 frame_buffer = deque(maxlen=100) # 新增帧缓冲区 # 后续帧收集任务列表,元素格式:(pre_buffer, post_buffer, 剩余帧数, 触发时间) active_tasks = [] # Store the track history track_history = defaultdict(lambda: []) # 用于存储每个 track_id 最近的时间戳 time_stamps = defaultdict(lambda: deque(maxlen=200)) # 固定长度为 50 # 用于存储瞬时速度 instantaneous_velocities = defaultdict(lambda: deque(maxlen=100)) def apply_bias(position): """ 偏置函数:使用 x/ln(1+x) 计算偏置 已弃用 """ x, y = position bias_x = np.log1p(x) if x > 0 else 0 bias_y = np.log1p(y) if y > 0 else 0 return np.array([bias_x, bias_y]) def save_high_speed_video(buffer, trigger_time): """将缓冲区中的帧保存为MP4文件""" if len(buffer) < 1: return # 生成唯一文件名 timestamp = trigger_time.strftime("%Y%m%d%H%M%S%f") output_path = f"high_speed_{timestamp}.mp4" # 使用MP4编码(需确保OpenCV支持) fourcc_mp4 = cv2.VideoWriter_fourcc(*'x264') writer = cv2.VideoWriter(output_path, fourcc_mp4, fps, (frame_width, frame_height)) for frame in buffer: writer.write(frame) writer.release() def map_to_ellipse(position): x, y = position center_x = 640 center_y = 360 a = 580 b = 280 x_norm = x / 1280 y_norm = y / 720 d_norm = math.sqrt((x_norm - 0.5) ** 2 + (y_norm - 0.5) ** 2) theta_norm = math.atan2(y_norm - 0.5, x_norm - 0.5) f = d_norm a_new = a * f b_new = b * f bias_x = center_x + a_new * math.cos(theta_norm) bias_y = center_y + b_new * math.sin(theta_norm) return np.array([bias_x, bias_y]) # 创建 VideoWriter 对象以保存输出视频 fourcc = cv2.VideoWriter_fourcc(*'XVID') # 视频编码格式 output_file = "output_video.avi" # 输出文件名 fps = 25 # 帧率 frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height)) speed_threshold = 20 # 速度阈值 high_velocity_count_threshold = 20 # 高速度计数阈值 # Loop through the video frames while cap.isOpened(): # 记录当前时间 current_time = time.time() # Read a frame from the video success, frame = cap.read() if success: # 将当前帧加入缓冲区(深拷贝避免覆盖) frame_buffer.append(frame.copy()) # 新增 # 处理后续帧收集任务 for i in reversed(range(len(active_tasks))): pre_buffer, post_buffer, frames_left, trigger_time = active_tasks[i] post_buffer.append(frame.copy()) frames_left -= 1 if frames_left <= 0: combined = pre_buffer + list(post_buffer) save_high_speed_video(combined, trigger_time) del active_tasks[i] else: active_tasks[i] = (pre_buffer, post_buffer, frames_left, trigger_time) # Run YOLO11 tracking on the frame, persisting tracks between frames results = model.track(frame, persist=True, classes=0, conf=0.6) if results[0].boxes and results[0].boxes.id is not None: # Get the boxes and track IDs boxes = results[0].boxes.xywh.cpu() track_ids = results[0].boxes.id.int().cpu().tolist() for box, track_id in zip(boxes, track_ids): x, y, w, h = box # 绘制边界框 cv2.rectangle(frame, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), (0, 255, 0), 2) # 计算左下角坐标 bottom_left_x = int(x - w / 2) bottom_left_y = int(y + h / 2) # 计算中心点 center_x = int(x) center_y = int(y) # 绘制中心点 cv2.circle(frame, (center_x, center_y), 5, (255, 0, 0), -1) # 红色中心点,半径为 5 # 记录位置 track_history[track_id].append((bottom_left_x, bottom_left_y)) if len(track_history[track_id]) > 100: del track_history[track_id][:-50] # 维持历史长度 # 记录每一帧的时间 time_stamps[track_id].append(current_time) # 计算时间间隔 if len(time_stamps[track_id]) > 1: delta_time = time_stamps[track_id][-1] - time_stamps[track_id][-2] # 最近两帧的时间差 else: delta_time = 0 instantaneous_velocity = 0 # 计算二维瞬时速度 if len(track_history[track_id]) >= 2: pos1 = np.array(track_history[track_id][-1]) # 最新位置 pos2 = np.array(track_history[track_id][-2]) # 前一个位置 pos1 = map_to_ellipse(pos1) pos2 = map_to_ellipse(pos2) distance = np.linalg.norm(pos1 - pos2) # 使用时间间隔进行速度计算 instantaneous_velocity = distance / delta_time if delta_time > 0 else np.zeros(2) instantaneous_velocity_magnitude = round(np.linalg.norm(instantaneous_velocity), 1) instantaneous_velocities[track_id].append(instantaneous_velocity_magnitude) else: instantaneous_velocity_magnitude = 0 # 判断是否有足够数量的高速度 high_velocity_count = sum(1 for velocity in instantaneous_velocities[track_id] if velocity > speed_threshold) if high_velocity_count >= high_velocity_count_threshold: # 原逻辑:截图,标红 # cv2.putText(frame, str(instantaneous_velocity_magnitude), (int(x), int(y)), # cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) # data_time = str(datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) # file_name = "high_speed_" + data_time + ".jpg" # cv2.imwrite(file_name, frame) # 触发保存逻辑 pre_buffer = list(frame_buffer) # 深拷贝前200帧 post_buffer = deque(maxlen=100) trigger_time = datetime.datetime.now() active_tasks.append((pre_buffer, post_buffer, 100, trigger_time)) # 新增逻辑:删除超过 speed_threshold 的瞬时速度 instantaneous_velocities[track_id] = deque( [velocity for velocity in instantaneous_velocities[track_id] if velocity <= speed_threshold], maxlen=100 ) # 新增保存视频逻辑 data_time = datetime.datetime.now() save_high_speed_video(frame_buffer, data_time) else: cv2.putText(frame, str(instantaneous_velocity_magnitude), (int(x), int(y)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 121, 23), 2) # Save the annotated frame to the output video out.write(frame) # 将处理后的视频帧写入文件 # Display the annotated frame cv2.imshow("YOLO11 Tracking", frame) # Break the loop if 'q' is pressed if cv2.waitKey(1) & 0xFF == ord("q"): break else: # Break the loop if the end of the video is reached break # Release the video capture object and close the display window cap.release() out.release() # 释放 VideoWriter 对象 cv2.destroyAllWindows()