Browse Source

feat: 新增跑步的demo,增加了新的逻辑:如果检测到跑步(time_stamps长度改为200, instantaneous_velocities改为100, 速度阈值要随设备测试修改),则保存为一个前200帧的视频文件。

tuantuan 3 tuần trước cách đây
mục cha
commit
d627e89298
1 tập tin đã thay đổi với 196 bổ sung0 xóa
  1. 196 0
      liq_demo/person_speed.py

+ 196 - 0
liq_demo/person_speed.py

@@ -0,0 +1,196 @@
+import cv2
+import numpy as np
+from ultralytics import YOLO
+from collections import defaultdict, deque
+import datetime
+import time
+import math
+# Load the YOLO11 model
+model = YOLO("yolo11m.pt")
+
+# Open the video file
+video_path = r"E:\desktop_file\速度标定\run.mp4"
+# video_path = r"E:\wx_file\WeChat Files\wxid_1lcmt2w2jdwl22\FileStorage\File\2024-11\3.4-13时胶乳包装.mp4"
+cap = cv2.VideoCapture(video_path)
+
+# 存储最近的200帧用于回溯
+frame_buffer = deque(maxlen=200)  # 新增帧缓冲区
+
+# Store the track history
+track_history = defaultdict(lambda: [])
+# 用于存储每个 track_id 最近的时间戳
+time_stamps = defaultdict(lambda: deque(maxlen=200))  # 固定长度为 50
+# 用于存储瞬时速度
+instantaneous_velocities = defaultdict(lambda: deque(maxlen=100))
+
+
+def apply_bias(position):
+    """
+    偏置函数:使用 x/ln(1+x) 计算偏置
+    已弃用
+    """
+    x, y = position
+    bias_x = np.log1p(x) if x > 0 else 0
+    bias_y = np.log1p(y) if y > 0 else 0
+    return np.array([bias_x, bias_y])
+
+
+def save_high_speed_video(buffer, trigger_time):
+    """将缓冲区中的帧保存为MP4文件"""
+    if len(buffer) < 1:
+        return
+
+    # 生成唯一文件名
+    timestamp = trigger_time.strftime("%Y%m%d%H%M%S%f")
+    output_path = f"high_speed_{timestamp}.mp4"
+
+    # 使用MP4编码(需确保OpenCV支持)
+    fourcc_mp4 = cv2.VideoWriter_fourcc(*'x264')
+    writer = cv2.VideoWriter(output_path, fourcc_mp4, fps, (frame_width, frame_height))
+
+    for frame in buffer:
+        writer.write(frame)
+    writer.release()
+
+
+def map_to_ellipse(position):
+    x, y = position
+    center_x = 640
+    center_y = 360
+    a = 580
+    b = 280
+
+    x_norm = x / 1280
+    y_norm = y / 720
+
+    d_norm = math.sqrt((x_norm - 0.5) ** 2 + (y_norm - 0.5) ** 2)
+    theta_norm = math.atan2(y_norm - 0.5, x_norm - 0.5)
+    f = d_norm
+    a_new = a * f
+    b_new = b * f
+
+    bias_x = center_x + a_new * math.cos(theta_norm)
+    bias_y = center_y + b_new * math.sin(theta_norm)
+
+    return np.array([bias_x, bias_y])
+
+# 创建 VideoWriter 对象以保存输出视频
+fourcc = cv2.VideoWriter_fourcc(*'XVID')  # 视频编码格式
+output_file = "output_video.avi"  # 输出文件名
+fps = 25  # 帧率
+frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
+frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
+out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))
+
+speed_threshold = 30  # 速度阈值
+high_velocity_count_threshold = 20  # 高速度计数阈值
+
+# Loop through the video frames
+while cap.isOpened():
+    # 记录当前时间
+    current_time = time.time()
+
+    # Read a frame from the video
+    success, frame = cap.read()
+
+    if success:
+        # 将当前帧加入缓冲区(深拷贝避免覆盖)
+        frame_buffer.append(frame.copy())  # 新增
+
+        # Run YOLO11 tracking on the frame, persisting tracks between frames
+        results = model.track(frame, persist=True, classes=0, conf=0.6)
+
+        if results[0].boxes and results[0].boxes.id is not None:
+            # Get the boxes and track IDs
+            boxes = results[0].boxes.xywh.cpu()
+            track_ids = results[0].boxes.id.int().cpu().tolist()
+
+            for box, track_id in zip(boxes, track_ids):
+                x, y, w, h = box
+
+                # 绘制边界框
+                cv2.rectangle(frame, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), (0, 255, 0), 2)
+                # 计算左下角坐标
+                bottom_left_x = int(x - w / 2)
+                bottom_left_y = int(y + h / 2)
+
+                # 计算中心点
+                center_x = int(x)
+                center_y = int(y)
+
+                # 绘制中心点
+                cv2.circle(frame, (center_x, center_y), 5, (255, 0, 0), -1)  # 红色中心点,半径为 5
+
+                # 记录位置
+                track_history[track_id].append((bottom_left_x, bottom_left_y))
+                if len(track_history[track_id]) > 100:
+                    del track_history[track_id][:-50]  # 维持历史长度
+
+                # 记录每一帧的时间
+                time_stamps[track_id].append(current_time)
+
+                # 计算时间间隔
+                if len(time_stamps[track_id]) > 1:
+                    delta_time = time_stamps[track_id][-1] - time_stamps[track_id][-2]  # 最近两帧的时间差
+                else:
+                    delta_time = 0
+
+                instantaneous_velocity = 0
+                # 计算二维瞬时速度
+                if len(track_history[track_id]) >= 2:
+                    pos1 = np.array(track_history[track_id][-1])  # 最新位置
+                    pos2 = np.array(track_history[track_id][-2])  # 前一个位置
+
+                    pos1 = map_to_ellipse(pos1)
+                    pos2 = map_to_ellipse(pos2)
+                    distance = np.linalg.norm(pos1 - pos2)
+
+                    # 使用时间间隔进行速度计算
+                    instantaneous_velocity = distance / delta_time if delta_time > 0 else np.zeros(2)
+
+                    instantaneous_velocity_magnitude = round(np.linalg.norm(instantaneous_velocity), 1)
+                    instantaneous_velocities[track_id].append(instantaneous_velocity_magnitude)
+                else:
+                    instantaneous_velocity_magnitude = 0
+
+                # 判断是否有足够数量的高速度
+                high_velocity_count = sum(1 for velocity in instantaneous_velocities[track_id] if velocity > speed_threshold)
+
+                if high_velocity_count >= high_velocity_count_threshold:
+
+                    # 原逻辑:截图,标红
+                    # cv2.putText(frame, str(instantaneous_velocity_magnitude), (int(x), int(y)),
+                    #             cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
+                    # data_time = str(datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
+                    # file_name = "high_speed_" + data_time + ".jpg"
+                    # cv2.imwrite(file_name, frame)
+
+                    # 新增逻辑:删除超过 speed_threshold 的瞬时速度
+                    instantaneous_velocities[track_id] = deque(
+                        [velocity for velocity in instantaneous_velocities[track_id] if velocity <= speed_threshold],
+                        maxlen=100
+                    )
+                    # 新增保存视频逻辑
+                    data_time = datetime.datetime.now()
+                    save_high_speed_video(frame_buffer, data_time)
+                else:
+                    cv2.putText(frame, str(instantaneous_velocity_magnitude), (int(x), int(y)),
+                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 121, 23), 2)
+
+            # Save the annotated frame to the output video
+            out.write(frame)  # 将处理后的视频帧写入文件
+
+            # Display the annotated frame
+            cv2.imshow("YOLO11 Tracking", frame)
+
+            # Break the loop if 'q' is pressed
+            if cv2.waitKey(1) & 0xFF == ord("q"):
+                break
+    else:
+        # Break the loop if the end of the video is reached
+        break
+
+# Release the video capture object and close the display window
+cap.release()
+out.release()  # 释放 VideoWriter 对象
+cv2.destroyAllWindows()