123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- import cv2
- import torch
- from shapely.geometry import box, Polygon
- import threading
- import numpy as np
- from datetime import datetime
- from ultralytics import YOLO
- from utils.tool import IoU
- from globals import stop_event,redis_client
- from config import SAVE_IMG_PATH,POST_IMG_PATH5,BASKET_CLEANING_VIDEO_SOURCES,BASKET_CLEANING_MODEL_SOURCES
- from config import BASKET_SUSPENSION_REGION,BASKET_STEEL_WIRE_REGION,BASKET_PLATFORM_REGION,BASKET_LIFTING_REGION,BASKET_ELECTRICAL_SYSTEM_REGION,BASKET_SAFETY_LOCK_REGION,BASKET_EMPTY_LOAD_REGION,BASKET_CLEANING_OPERATION_REGION
- from globals import basket_suspension_flag,basket_warning_zone_flag,basket_steel_wire_flag,basket_platform_flag,basket_person_flag
- from globals import basket_electrical_system_flag,basket_lifting_flag,basket_safety_lock_flag,basket_safety_belt_flag,basket_empty_load_flag,basket_cleaning_operation_flag,basket_cleaning_up_flag
- def init_basket_cleaning_detection():
- global basket_suspension_flag,basket_warning_zone_flag,basket_steel_wire_flag,basket_platform_flag,basket_electrical_system_flag,basket_lifting_flag,basket_safety_lock_flag,basket_safety_belt_flag,basket_cleaning_up_flag,basket_cleaning_operation_flag,basket_empty_load_flag,basket_person_flag
- basket_suspension_flag=False
- basket_warning_zone_flag=False
- basket_steel_wire_flag=False
- basket_platform_flag=False
- basket_electrical_system_flag=False
- basket_lifting_flag=False
- basket_safety_lock_flag=False
- basket_safety_belt_flag=False
- basket_cleaning_up_flag=False
- basket_cleaning_operation_flag=False
- basket_empty_load_flag=False
- basket_person_flag=False
-
- for i in range(1, 12):
- redis_client.delete(f"basket_step_{i}")
- redis_client.delete("basket_cleaning_order")
- pass
- def start_basket_cleaning_detection(start_events):
- threads = []
- for model_path, video_source in zip(BASKET_CLEANING_MODEL_SOURCES,BASKET_CLEANING_VIDEO_SOURCES):
- event = threading.Event()
- start_events.append(event)
- thread = threading.Thread(target=process_video, args=(model_path, video_source,event))
- threads.append(thread)
- thread.daemon=True
- thread.start()
- print("吊篮清洗子线程运行中")
- # Wait for any threads to complete
- for thread in threads:
- thread.join()
- print("吊篮清洗子线程运行结束")
- def point_in_region(point, region):#判断点是否在多边形内
- is_inside = cv2.pointPolygonTest(region.reshape((-1, 1, 2)), point, False)
- return is_inside >= 0
- def save_image_and_redis(redis_client, results, step_name, save_path, post_path):
- save_time = datetime.now().strftime('%Y%m%d_%H%M')
- imgpath = f"{save_path}/{step_name}_{save_time}.jpg"
- annotated_frame = results[0].plot()
- if step_name == "basket_step_2":
- annotated_frame = cv2.polylines(annotated_frame, [region.reshape(-1, 1, 2) for region in BASKET_SUSPENSION_REGION], isClosed=True, color=(0, 255, 0), thickness=4)
- elif step_name == "basket_step_3":
- annotated_frame = cv2.polylines(annotated_frame, [region.reshape(-1, 1, 2) for region in BASKET_STEEL_WIRE_REGION], isClosed=True, color=(0, 255, 0), thickness=4)
- elif step_name == "basket_step_4":
- annotated_frame = cv2.polylines(annotated_frame, [BASKET_PLATFORM_REGION.reshape(-1, 1, 2)], isClosed=True, color=(0, 255, 0), thickness=4)
- #cv2.fillPoly(annotated_frame, [BASKET_PLATFORM_REGION], (0, 255, 0))#填充区域
- elif step_name == "basket_step_5":
- annotated_frame = cv2.polylines(annotated_frame, [region.reshape(-1, 1, 2) for region in BASKET_LIFTING_REGION], isClosed=True, color=(0, 255, 0), thickness=4)
- elif step_name == "basket_step_6":
- annotated_frame = cv2.polylines(annotated_frame, [region.reshape(-1, 1, 2) for region in BASKET_SAFETY_LOCK_REGION], isClosed=True, color=(0, 255, 0), thickness=4)
- elif step_name == "basket_step_7":
- annotated_frame = cv2.polylines(annotated_frame, [BASKET_ELECTRICAL_SYSTEM_REGION.reshape(-1, 1, 2)], isClosed=True, color=(0, 255, 0), thickness=4)
-
- cv2.imwrite(imgpath, annotated_frame)
- redis_client.set(step_name, post_path)
- redis_client.rpush("basket_cleaning_order", step_name)
- def get_region_mean_color(regions_security_lock_and_work_area, img):
- for region in regions_security_lock_and_work_area:
- points = np.array(region, dtype=np.int32)
- mask = np.zeros(img.shape[:2], dtype=np.uint8)
- cv2.fillPoly(mask, [points], 255)
- roi = cv2.bitwise_and(img, img, mask=mask)
- mean_color = cv2.mean(roi)
- # 将BGR颜色均值再平均
- average_color_value = sum(mean_color[:3]) / 3
- # 判断平均值是否大于0.30
- if average_color_value < 0.30:
- return True
- else:
- return False
- def process_video(model_path, video_source,start_event):
- model = YOLO(model_path)
- cap = cv2.VideoCapture(video_source)
-
- #cap.set(cv2.CAP_PROP_BUFFERSIZE, 100) # 设置缓冲区大小为 10 帧
- while cap.isOpened():
- if stop_event.is_set():#控制停止推理
- break
- success, frame = cap.read()
- if success:
- # Run YOLOv8 inference on the frame
- if cap.get(cv2.CAP_PROP_POS_FRAMES) % 25 != 0:
- continue
- results = model.predict(frame,conf=0.6,verbose=False)
- global basket_suspension_flag,basket_warning_zone_flag,basket_steel_wire_flag,basket_platform_flag,basket_electrical_system_flag,basket_lifting_flag,basket_safety_lock_flag,basket_safety_belt_flag,basket_cleaning_up_flag,basket_cleaning_operation_flag,basket_empty_load_flag,basket_person_flag
- global BASKET_PLATFORM_REGION,BASKET_LIFTING_REGION,BASKET_ELECTRICAL_SYSTEM_REGION
- for r in results:
- if model_path==BASKET_CLEANING_MODEL_SOURCES[0] and not basket_suspension_flag:#D4悬挂机构
- boxes=r.boxes.xyxy#人体的检测框
- keypoints = r.keypoints.xy
- confidences = r.keypoints.conf
- for i in range(len(boxes)):
- left_wrist, right_wrist = keypoints[i][9:11].tolist()#获取左右手腕和左右肘的坐标
- points = [left_wrist, right_wrist]
- is_inside1 = any(point_in_region(point, BASKET_SUSPENSION_REGION[0]) for point in points)
- is_inside2 = any(point_in_region(point, BASKET_SUSPENSION_REGION[1]) for point in points)
- is_inside3 = any(point_in_region(point, BASKET_SUSPENSION_REGION[2]) for point in points)
- is_inside4 = any(point_in_region(point, BASKET_SUSPENSION_REGION[3]) for point in points)
- if is_inside1 or is_inside2 or is_inside3 or is_inside4:
- basket_suspension_flag=True#悬挂机构
- print("悬挂机构")
-
- elif model_path==BASKET_CLEANING_MODEL_SOURCES[1]:#D5吊篮悬挂
- boxes = r.boxes.xyxy
- confidences = r.boxes.conf
- classes = r.boxes.cls
- basket_warning_zone_flag=False#当检测不到则为False
- basket_cleaning_up_flag=False
- for i in range(len(boxes)):
- x1, y1, x2, y2 = boxes[i].tolist()
- confidence = confidences[i].item()
- cls = int(classes[i].item())
- label = model.names[cls]
- if label=='warning_zone':
- basket_warning_zone_flag=True
- elif label=='brush':
- basket_cleaning_up_flag=True
- # elif label=='person':
- elif model_path==BASKET_CLEANING_MODEL_SOURCES[2]:#D6,pose
- boxes=r.boxes.xyxy#人体的检测框
- keypoints = r.keypoints.xy
- confidences = r.keypoints.conf
- basket_person_flag=False#当检测不到则为False
- for i in range(len(boxes)):
- #当有检测框,则说明有人
- basket_person_flag=True
- left_wrist, right_wrist = keypoints[i][9:11].tolist()#获取左右手腕和左右肘的坐标
- points = [left_wrist, right_wrist]
- if not basket_steel_wire_flag:
- is_inside1 = any(point_in_region(point, BASKET_STEEL_WIRE_REGION[0]) for point in points)
- is_inside2 = any(point_in_region(point, BASKET_STEEL_WIRE_REGION[1]) for point in points)
- #is_inside3 = any(point_in_region(point, BASKET_STEEL_WIRE_REGION[2]) for point in points)
- #is_inside4 = any(point_in_region(point, BASKET_STEEL_WIRE_REGION[3]) for point in points)
- if is_inside1 or is_inside2:
- basket_steel_wire_flag=True#钢丝绳
- print("钢丝绳")
- if not basket_platform_flag:
- is_inside = any(point_in_region(point, BASKET_PLATFORM_REGION) for point in points)
- if is_inside:
- basket_platform_flag=True
- print("平台")
- if not basket_lifting_flag:
- is_inside = any(point_in_region(point, BASKET_LIFTING_REGION) for point in points)
- if is_inside:
- basket_lifting_flag=True
- print("提升机")
- if not basket_electrical_system_flag:
- is_inside = any(point_in_region(point, BASKET_ELECTRICAL_SYSTEM_REGION) for point in points)
- if is_inside:
- basket_electrical_system_flag=True
- print("电气系统")
- if not basket_safety_lock_flag:
- is_inside1 = any(point_in_region(point, BASKET_SAFETY_LOCK_REGION[0]) for point in points)
- is_inside2 = any(point_in_region(point, BASKET_SAFETY_LOCK_REGION[1]) for point in points)
-
- if is_inside1 or is_inside2:
- basket_safety_lock_flag=True
- print("安全锁")
- if not basket_person_flag and get_region_mean_color([BASKET_EMPTY_LOAD_REGION], frame):
- basket_empty_load_flag=True
-
- elif model_path==BASKET_CLEANING_MODEL_SOURCES[3]:#d6目标检测
- boxes = r.boxes.xyxy
- confidences = r.boxes.conf
- classes = r.boxes.cls
- #basket_safety_lock_flag=False#当检测不到则为False
- for i in range(len(boxes)):
- x1, y1, x2, y2 = boxes[i].tolist()
- confidence = confidences[i].item()
- cls = int(classes[i].item())
- label = model.names[cls]
- if label=='safety_belt':
- basket_safety_belt_flag=True
- elif label=='brush':
- is_inside = any(point_in_region(point,BASKET_CLEANING_OPERATION_REGION) for point in points)
- if is_inside:
- basket_cleaning_operation_flag=True
- else:
- boxes = r.boxes.xyxy
- masks = r.masks.xy
- classes = r.boxes.cls
- hoist=[]
- for i in range(len(boxes)):
- x1, y1, x2, y2 = boxes[i].tolist()
- cls = int(classes[i].item())
- label = model.names[cls]
- if label=="basket":
- BASKET_PLATFORM_REGION = np.array(masks[i].tolist(), np.int32)
- elif label=="hoist":
- #hoist.append(masks[i].tolist())
- #hoist.append(np.array(masks[i].tolist(), np.int32))
- # print(masks[i].tolist())
- BASKET_LIFTING_REGION = np.array(masks[i].tolist(),np.int32)
- #pass
- #TODO
- elif label=="electricalSystem":
- BASKET_ELECTRICAL_SYSTEM_REGION = np.array(masks[i].tolist(), np.int32)
-
- # 检查是否所有hoist masks具有相同形状
- # if all(h.shape == hoist[0].shape for h in hoist):
- # BASKET_LIFTING_REGION = np.vstack(hoist) # 堆叠成二维数组
- # else:
- # # 如果形状不同,可以手动处理,比如补齐/裁剪
- # # 这里假设你想补齐到最大的形状
- # max_shape = max(h.shape for h in hoist)
- # padded_hoist = [np.pad(h, ((0, max_shape[0] - h.shape[0]), (0, max_shape[1] - h.shape[1])), mode='constant') for h in hoist]
- # BASKET_LIFTING_REGION = np.array(padded_hoist)
- #BASKET_LIFTING_REGION = np.array(hoist,np.int32)
- #print("hoist",hoist.shape)
-
- if model_path==BASKET_CLEANING_MODEL_SOURCES[0] and not redis_client.exists("basket_step_2") and basket_suspension_flag:#D4悬挂机构
- save_image_and_redis(redis_client, results, "basket_step_2", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif model_path==BASKET_CLEANING_MODEL_SOURCES[1]:#D5吊篮悬挂
- if basket_warning_zone_flag and not redis_client.exists("basket_step_1"):#警戒区
- save_image_and_redis(redis_client, results, "basket_step_1", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif basket_cleaning_up_flag and not redis_client.exists("basket_step_11"):
- save_image_and_redis(redis_client, results, "basket_step_11", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif not basket_warning_zone_flag and redis_client.exists("basket_step_1") and not redis_client.exists("basket_step_12"):
- save_image_and_redis(redis_client, results, "basket_step_12", SAVE_IMG_PATH, POST_IMG_PATH5)
-
- elif model_path==BASKET_CLEANING_MODEL_SOURCES[2]:#D6,pose
- if basket_steel_wire_flag and not redis_client.exists("basket_step_3"):
- save_image_and_redis(redis_client, results, "basket_step_3", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif basket_platform_flag and not redis_client.exists("basket_step_4"):
- save_image_and_redis(redis_client, results, "basket_step_4", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif basket_lifting_flag and not redis_client.exists("basket_step_5"):
- save_image_and_redis(redis_client, results, "basket_step_5", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif basket_electrical_system_flag and not redis_client.exists("basket_step_7"):
- save_image_and_redis(redis_client, results, "basket_step_7", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif basket_safety_lock_flag and not redis_client.exists("basket_step_6"):
- save_image_and_redis(redis_client, results, "basket_step_6", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif basket_empty_load_flag and not redis_client.exists("basket_step_8"):
- save_image_and_redis(redis_client, results, "basket_step_8", SAVE_IMG_PATH, POST_IMG_PATH5)
- else:#d6目标检测
- if basket_safety_belt_flag and not redis_client.exists("basket_step_9"):
- save_image_and_redis(redis_client, results, "basket_step_9", SAVE_IMG_PATH, POST_IMG_PATH5)
- elif basket_cleaning_operation_flag and not redis_client.exists("basket_step_10"):
- save_image_and_redis(redis_client, results, "basket_step_10", SAVE_IMG_PATH, POST_IMG_PATH5)
- start_event.set()
- else:
- # Break the loop if the end of the video is reached
- break
- # Release the video capture object and close the display window
- cap.release()
- if torch.cuda.is_available():
- torch.cuda.empty_cache()
- del model
-
-
|