123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- import cv2
- import torch
- from shapely.geometry import box, Polygon
- import threading
- import numpy as np
- from datetime import datetime
- from ultralytics import YOLO
- from utils.tool import IoU
- from globals import stop_event,redis_client
- from config import SAVE_IMG_PATH,POST_IMG_PATH6,EQUIPMENT_CLEANING_VIDEO_SOURCES,EQUIPMENT_CLEANING_MODEL_SOURCES
- from config import EQUIPMENT_SAFETY_ROPE_REGION,EQUIPMENT_WORK_ROPE_REGION,EQUIPMENT_ANCHOR_DEVICE_REGION,EQUIPMENT_CLEANING_OPERATION_REGION
- from globals import equipment_cleaning_flag
- def init_equipment_cleaning_detection():
- global equipment_cleaning_flag
- equipment_cleaning_flag=[False]*12
-
- for i in range(1, 12):
- redis_client.delete(f"equipment_step_{i}")
- redis_client.delete("equipment_cleaning_order")
- pass
- def start_equipment_cleaning_detection(start_events):
- threads = []
- for model_path, video_source in zip(EQUIPMENT_CLEANING_MODEL_SOURCES,EQUIPMENT_CLEANING_VIDEO_SOURCES):
- event = threading.Event()
- start_events.append(event)
- thread = threading.Thread(target=process_video, args=(model_path, video_source,event))
- threads.append(thread)
- thread.daemon=True
- thread.start()
- print("单人吊具清洗子线程运行中")
- # Wait for any threads to complete
- for thread in threads:
- thread.join()
- print("单人吊具清洗子线程运行结束")
- def point_in_region(point, region):#判断点是否在多边形内
- is_inside = cv2.pointPolygonTest(region.reshape((-1, 1, 2)), point, False)
- return is_inside >= 0
- def save_image_and_redis(redis_client, results, step_name, save_path, post_path):
- save_time = datetime.now().strftime('%Y%m%d_%H%M')
- imgpath = f"{save_path}/{step_name}_{save_time}.jpg"
- annotated_frame = results[0].plot()
- # if step_name == "equipment_step_2":
- # annotated_frame = cv2.polylines(annotated_frame, [region.reshape(-1, 1, 2) for region in BASKET_SUSPENSION_REGION], isClosed=True, color=(0, 255, 0), thickness=4)
- # elif step_name == "equipment_step_3":
- # annotated_frame = cv2.polylines(annotated_frame, [region.reshape(-1, 1, 2) for region in BASKET_STEEL_WIRE_REGION], isClosed=True, color=(0, 255, 0), thickness=4)
- # elif step_name == "equipment_step_4":
- # annotated_frame = cv2.polylines(annotated_frame, BASKET_PLATFORM_REGION.reshape(-1, 1, 2), isClosed=True, color=(0, 255, 0), thickness=4)
- # elif step_name == "equipment_step_6":
- # annotated_frame = cv2.polylines(annotated_frame, [region.reshape(-1, 1, 2) for region in BASKET_SAFETY_LOCK_REGION], isClosed=True, color=(0, 255, 0), thickness=4)
-
- cv2.imwrite(imgpath, annotated_frame)
- redis_client.set(step_name, post_path)
- redis_client.rpush("equipment_cleaning_order", step_name)
- def process_video(model_path, video_source,start_event):
- model = YOLO(model_path)
- cap = cv2.VideoCapture(video_source)
-
- #cap.set(cv2.CAP_PROP_BUFFERSIZE, 100) # 设置缓冲区大小为 10 帧
- while cap.isOpened():
- if stop_event.is_set():#控制停止推理
- break
- success, frame = cap.read()
- if success:
- # Run YOLOv8 inference on the frame
- if cap.get(cv2.CAP_PROP_POS_FRAMES) % 25 != 0:
- continue
- results = model.predict(frame,conf=0.6,verbose=False)
- global equipment_cleaning_flag
- for r in results:
- if model_path==EQUIPMENT_CLEANING_MODEL_SOURCES[0]:#D3 detect
- boxes = r.boxes.xyxy
- confidences = r.boxes.conf
- classes = r.boxes.cls
-
- # basket_warning_zone_flag=False#当检测不到则为False
- # basket_cleaning_up_flag=False
- equipment_cleaning_flag[0]=False
- for i in range(len(boxes)):
- x1, y1, x2, y2 = boxes[i].tolist()
- confidence = confidences[i].item()
- cls = int(classes[i].item())
- label = model.names[cls]
- if label=='warning_zone':
- equipment_cleaning_flag[0]=True
- elif label=='person':
- centerx=(x1+x2)/2
- centery=(y1+y2)/2
- point_in_region_flag=point_in_region([centerx,centery],EQUIPMENT_ANCHOR_DEVICE_REGION)#挂点装置是否检测到人
- if point_in_region_flag:
- equipment_cleaning_flag[1]=True
- elif label=='seating_plate':
- equipment_cleaning_flag[2]=True
- elif label=='self_locking':
- equipment_cleaning_flag[5]=True
- equipment_cleaning_flag[6]=True
- elif label=='brush':
- equipment_cleaning_flag[10]=True
- elif not equipment_cleaning_flag[0]:#当检测不到警戒区时,判定未拆除警戒区域
- equipment_cleaning_flag[11]=True
-
- elif model_path==EQUIPMENT_CLEANING_MODEL_SOURCES[1]:#D8,pose
- boxes=r.boxes.xyxy#人体的检测框
- keypoints = r.keypoints.xy
- confidences = r.keypoints.conf
- basket_person_flag=False#当检测不到则为False
- for i in range(len(boxes)):
- #当有检测框,则说明有人
- #basket_person_flag=True
- left_wrist, right_wrist = keypoints[i][9:11].tolist()#获取左右手腕和左右肘的坐标
- points = [left_wrist, right_wrist]
- if not equipment_cleaning_flag[3]:
- is_inside1 = any(point_in_region(point, EQUIPMENT_WORK_ROPE_REGION[0]) for point in points)
- if is_inside1:
- equipment_cleaning_flag[3]=True#工作绳
- print("工作绳")
- if not equipment_cleaning_flag[4]:
- is_inside = any(point_in_region(point, EQUIPMENT_SAFETY_ROPE_REGION) for point in points)
- if is_inside:
- equipment_cleaning_flag[4]=True#安全绳
- print("安全绳")
-
- else:#d8 目标检测
- boxes = r.boxes.xyxy
- confidences = r.boxes.conf
- classes = r.boxes.cls
- #basket_safety_lock_flag=False#当检测不到则为False
- for i in range(len(boxes)):
- x1, y1, x2, y2 = boxes[i].tolist()
- confidence = confidences[i].item()
- cls = int(classes[i].item())
- label = model.names[cls]
- if label=='safety_belt':
- equipment_cleaning_flag[8]=True
- elif label=='brush':
- is_inside = any(point_in_region(point,EQUIPMENT_CLEANING_OPERATION_REGION) for point in points)#刷子是否在指定区域
- if is_inside:
- equipment_cleaning_flag[9]=True
- if model_path==EQUIPMENT_CLEANING_MODEL_SOURCES[0]:
- if equipment_cleaning_flag[0] and not redis_client.exists("equipment_step_1"):
- save_image_and_redis(redis_client, results, "equipment_step_1", SAVE_IMG_PATH, POST_IMG_PATH6)
- if equipment_cleaning_flag[1] and not redis_client.exists("equipment_step_2"):
- save_image_and_redis(redis_client, results, "equipment_step_2", SAVE_IMG_PATH, POST_IMG_PATH6)
- if equipment_cleaning_flag[2] and not redis_client.exists("equipment_step_3"):
- save_image_and_redis(redis_client, results, "equipment_step_3", SAVE_IMG_PATH, POST_IMG_PATH6)
- if equipment_cleaning_flag[5] and not redis_client.exists("equipment_step_6"):
- save_image_and_redis(redis_client, results, "equipment_step_6", SAVE_IMG_PATH, POST_IMG_PATH6)
- if equipment_cleaning_flag[10] and not redis_client.exists("equipment_step_11"):
- save_image_and_redis(redis_client, results, "equipment_step_11", SAVE_IMG_PATH, POST_IMG_PATH6)
- if equipment_cleaning_flag[11] and not redis_client.exists("equipment_step_12"):
- save_image_and_redis(redis_client, results, "equipment_step_12", SAVE_IMG_PATH, POST_IMG_PATH6)
-
- elif model_path==EQUIPMENT_CLEANING_MODEL_SOURCES[1]:
- if equipment_cleaning_flag[3] and not redis_client.exists("equipment_step_4"):
- save_image_and_redis(redis_client, results, "equipment_step_4", SAVE_IMG_PATH, POST_IMG_PATH6)
- if equipment_cleaning_flag[4] and not redis_client.exists("equipment_step_5"):
- save_image_and_redis(redis_client, results, "equipment_step_5", SAVE_IMG_PATH, POST_IMG_PATH6)
-
- else:#d6目标检测
- if equipment_cleaning_flag[8] and not redis_client.exists("equipment_step_9"):
- save_image_and_redis(redis_client, results, "equipment_step_9", SAVE_IMG_PATH, POST_IMG_PATH6)
- if equipment_cleaning_flag[9] and not redis_client.exists("equipment_step_10"):
- save_image_and_redis(redis_client, results, "equipment_step_10", SAVE_IMG_PATH, POST_IMG_PATH6)
- start_event.set()
- else:
- # Break the loop if the end of the video is reached
- break
- # Release the video capture object and close the display window
- cap.release()
- if torch.cuda.is_available():
- torch.cuda.empty_cache()
- del model
-
-
|