123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import cv2
- import torch
- from shapely.geometry import box, Polygon
- import threading
- import numpy as np
- from datetime import datetime
- from ultralytics import YOLO
- from utils.tool import IoU
- from globals import stop_event,redis_client
- from config import SAVE_IMG_PATH,POST_IMG_PATH4,BASKET_CLEANING_VIDEO_SOURCES,BASKET_CLEANING_MODEL_SOURCES
- from globals import basket_suspension_flag,basket_warning_zone_flag,basket_steel_wire_flag
- def init_basket_cleaning_detection():
- # global platform_setup_steps_detect_num,platform_setup_final_result,platform_setup_steps_img
- # platform_setup_final_result=[0]*14
- # platform_setup_steps_detect_num=[0]*14
- # platform_setup_steps_img=[False]*14
- # redis_client.delete("platform_setup_order")
- # for i in range(1, 14):
- # redis_client.delete(f"platform_setup_{i}")
- # redis_client.delete(f"platform_setup_{i}_img")
- pass
- def start_basket_cleaning_detection(start_events):
- threads = []
- for model_path, video_source in zip(BASKET_CLEANING_VIDEO_SOURCES, BASKET_CLEANING_MODEL_SOURCES):
- event = threading.Event()
- start_events.append(event)
- thread = threading.Thread(target=process_video, args=(model_path, video_source,event))
- threads.append(thread)
- thread.daemon=True
- thread.start()
- # Wait for all threads to complete
- for thread in threads:
- thread.join()
- print("吊篮清洗子线程运行结束")
- def point_in_region(point, region):#判断点是否在多边形内
- is_inside = cv2.pointPolygonTest(region.reshape((-1, 1, 2)), point, False)
- return is_inside >= 0
- def process_video(model_path, video_source,start_event):
- model = YOLO(model_path)
- cap = cv2.VideoCapture(video_source)
- while cap.isOpened():
- if stop_event.is_set():#控制停止推理
- break
- success, frame = cap.read()
- if success:
- # Run YOLOv8 inference on the frame
- if cap.get(cv2.CAP_PROP_POS_FRAMES) % 25 != 0:
- continue
- results = model.predict(frame,conf=0.6,verbose=False)
-
- #悬挂机构区域,分为四个区域
- SUSPENSION_REGION = np.array([[[668, 310], [800, 310], [800, 1070], [668, 1070]],
- [[1690, 310], [1750, 310], [1750, 710], [1690, 710]],
- [[1350, 340], [1405, 340], [1405, 720], [1350, 720]],
- [[550, 385], [635, 385], [635, 880], [550, 880]]], np.int32)
-
- STEEL_WIRE_REGION = np.array([[[0, 0], [0, 0], [0, 0], [0, 0]],[[668, 310], [800, 310], [800, 1070], [668, 1070]],[[0, 0], [0, 0], [0, 0], [0, 0]],[[668, 310], [800, 310], [800, 1070], [668, 1070]]], np.int32)#钢丝绳区域,暂时没有钢丝绳的区域
- global basket_suspension_flag,basket_warning_zone_flag,basket_steel_wire_flag
- for r in results:
- if model_path==BASKET_CLEANING_MODEL_SOURCES[0] and not basket_suspension_flag:#D4悬挂机构
- boxes=r.boxes.xyxy#人体的检测框
- keypoints = r.keypoints.xy
- confidences = r.keypoints.conf
- for i in range(len(boxes)):
- left_elbow, right_elbow, left_wrist, right_wrist = keypoints[i][7:11].tolist()#获取左右手腕和左右肘的坐标
- points = [left_elbow, right_elbow, left_wrist, right_wrist]
- is_inside1 = all(point_in_region(point, SUSPENSION_REGION[0]) for point in points)
- is_inside2 = all(point_in_region(point, SUSPENSION_REGION[1]) for point in points)
- is_inside3 = all(point_in_region(point, SUSPENSION_REGION[2]) for point in points)
- is_inside4 = all(point_in_region(point, SUSPENSION_REGION[3]) for point in points)
- if is_inside1 or is_inside2 or is_inside3 or is_inside4:
- basket_suspension_flag=True#悬挂机构
-
- elif model_path==BASKET_CLEANING_MODEL_SOURCES[1]:#D5吊篮悬挂
- boxes = r.boxes.xyxy
- confidences = r.boxes.conf
- classes = r.boxes.cls
- basket_warning_zone_flag=False#当检测不到则为False
- for i in range(len(boxes)):
- x1, y1, x2, y2 = boxes[i].tolist()
- confidence = confidences[i].item()
- cls = int(classes[i].item())
- label = model.names[cls]
- if label=='warning_zone':
- basket_warning_zone_flag=True
- elif model_path==BASKET_CLEANING_MODEL_SOURCES[2]:#D6,pose
- boxes=r.boxes.xyxy#人体的检测框
- keypoints = r.keypoints.xy
- confidences = r.keypoints.conf
- for i in range(len(boxes)):
- left_elbow, right_elbow, left_wrist, right_wrist = keypoints[i][7:11].tolist()#获取左右手腕和左右肘的坐标
- points = [left_elbow, right_elbow, left_wrist, right_wrist]
- is_inside1 = all(point_in_region(point, STEEL_WIRE_REGION[0]) for point in points)
- is_inside2 = all(point_in_region(point, STEEL_WIRE_REGION[1]) for point in points)
- is_inside3 = all(point_in_region(point, STEEL_WIRE_REGION[2]) for point in points)
- is_inside4 = all(point_in_region(point, STEEL_WIRE_REGION[3]) for point in points)
- if is_inside1 or is_inside2 or is_inside3 or is_inside4:
- basket_steel_wire_flag=True#悬挂机构
-
- start_event.set()
- else:
- # Break the loop if the end of the video is reached
- break
- # Release the video capture object and close the display window
- cap.release()
- if torch.cuda.is_available():
- torch.cuda.empty_cache()
- del model
-
-
|