123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- import cv2
- import numpy as np
- from ultralytics import YOLO
- from collections import defaultdict, deque
- import datetime
- import time
- import math
- model = YOLO("yolo11m.pt")
- video_path = r"E:\desktop_file\速度标定\run.mp4"
- cap = cv2.VideoCapture(video_path)
- frame_buffer = deque(maxlen=100)
- active_tasks = []
- track_history = defaultdict(lambda: [])
- time_stamps = defaultdict(lambda: deque(maxlen=200))
- instantaneous_velocities = defaultdict(lambda: deque(maxlen=100))
- def apply_bias(position):
- """
- 偏置函数:使用 x/ln(1+x) 计算偏置
- 已弃用
- """
- x, y = position
- bias_x = np.log1p(x) if x > 0 else 0
- bias_y = np.log1p(y) if y > 0 else 0
- return np.array([bias_x, bias_y])
- def save_high_speed_video(buffer, trigger_time):
- """将缓冲区中的帧保存为MP4文件"""
- if len(buffer) < 1:
- return
-
- timestamp = trigger_time.strftime("%Y%m%d%H%M%S%f")
- output_path = f"high_speed_{timestamp}.mp4"
-
- fourcc_mp4 = cv2.VideoWriter_fourcc(*'x264')
- writer = cv2.VideoWriter(output_path, fourcc_mp4, fps, (frame_width, frame_height))
- for frame in buffer:
- writer.write(frame)
- writer.release()
- def map_to_ellipse(position):
- x, y = position
- center_x = 640
- center_y = 360
- a = 580
- b = 280
- x_norm = x / 1280
- y_norm = y / 720
- d_norm = math.sqrt((x_norm - 0.5) ** 2 + (y_norm - 0.5) ** 2)
- theta_norm = math.atan2(y_norm - 0.5, x_norm - 0.5)
- f = d_norm
- a_new = a * f
- b_new = b * f
- bias_x = center_x + a_new * math.cos(theta_norm)
- bias_y = center_y + b_new * math.sin(theta_norm)
- return np.array([bias_x, bias_y])
- fourcc = cv2.VideoWriter_fourcc(*'XVID')
- output_file = "output_video.avi"
- fps = 25
- frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))
- speed_threshold = 20
- high_velocity_count_threshold = 20
- while cap.isOpened():
-
- current_time = time.time()
-
- success, frame = cap.read()
- if success:
-
- frame_buffer.append(frame.copy())
-
- for i in reversed(range(len(active_tasks))):
- pre_buffer, post_buffer, frames_left, trigger_time = active_tasks[i]
- post_buffer.append(frame.copy())
- frames_left -= 1
- if frames_left <= 0:
- combined = pre_buffer + list(post_buffer)
- save_high_speed_video(combined, trigger_time)
- del active_tasks[i]
- else:
- active_tasks[i] = (pre_buffer, post_buffer, frames_left, trigger_time)
-
- results = model.track(frame, persist=True, classes=0, conf=0.6)
- if results[0].boxes and results[0].boxes.id is not None:
-
- boxes = results[0].boxes.xywh.cpu()
- track_ids = results[0].boxes.id.int().cpu().tolist()
- for box, track_id in zip(boxes, track_ids):
- x, y, w, h = box
-
- cv2.rectangle(frame, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), (0, 255, 0), 2)
-
- bottom_left_x = int(x - w / 2)
- bottom_left_y = int(y + h / 2)
-
- center_x = int(x)
- center_y = int(y)
-
- cv2.circle(frame, (center_x, center_y), 5, (255, 0, 0), -1)
-
- track_history[track_id].append((bottom_left_x, bottom_left_y))
- if len(track_history[track_id]) > 100:
- del track_history[track_id][:-50]
-
- time_stamps[track_id].append(current_time)
-
- if len(time_stamps[track_id]) > 1:
- delta_time = time_stamps[track_id][-1] - time_stamps[track_id][-2]
- else:
- delta_time = 0
- instantaneous_velocity = 0
-
- if len(track_history[track_id]) >= 2:
- pos1 = np.array(track_history[track_id][-1])
- pos2 = np.array(track_history[track_id][-2])
- pos1 = map_to_ellipse(pos1)
- pos2 = map_to_ellipse(pos2)
- distance = np.linalg.norm(pos1 - pos2)
-
- instantaneous_velocity = distance / delta_time if delta_time > 0 else np.zeros(2)
- instantaneous_velocity_magnitude = round(np.linalg.norm(instantaneous_velocity), 1)
- instantaneous_velocities[track_id].append(instantaneous_velocity_magnitude)
- else:
- instantaneous_velocity_magnitude = 0
-
- high_velocity_count = sum(1 for velocity in instantaneous_velocities[track_id] if velocity > speed_threshold)
- if high_velocity_count >= high_velocity_count_threshold:
-
-
-
-
-
-
-
- pre_buffer = list(frame_buffer)
- post_buffer = deque(maxlen=100)
- trigger_time = datetime.datetime.now()
- active_tasks.append((pre_buffer, post_buffer, 100, trigger_time))
-
- instantaneous_velocities[track_id] = deque(
- [velocity for velocity in instantaneous_velocities[track_id] if velocity <= speed_threshold],
- maxlen=100
- )
-
- data_time = datetime.datetime.now()
- save_high_speed_video(frame_buffer, data_time)
- else:
- cv2.putText(frame, str(instantaneous_velocity_magnitude), (int(x), int(y)),
- cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 121, 23), 2)
-
- out.write(frame)
-
- cv2.imshow("YOLO11 Tracking", frame)
-
- if cv2.waitKey(1) & 0xFF == ord("q"):
- break
- else:
-
- break
- cap.release()
- out.release()
- cv2.destroyAllWindows()
|