|
@@ -0,0 +1,186 @@
|
|
|
|
+import torch
|
|
|
|
+import torch.nn as nn
|
|
|
|
+import torch.nn.functional as F
|
|
|
|
+
|
|
|
|
+from pydantic import BaseModel
|
|
|
|
+from ultralytics import YOLO
|
|
|
|
+
|
|
|
|
+import os
|
|
|
|
+import cv2
|
|
|
|
+import numpy as np
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class Model(nn.Module):
|
|
|
|
+ def __init__(self, A, nnode, nfeature, nclass):
|
|
|
|
+ super().__init__()
|
|
|
|
+ self.fc1 = nn.Linear(nnode * nfeature, 512)
|
|
|
|
+ self.fc2 = nn.Linear(512, nclass)
|
|
|
|
+
|
|
|
|
+ def forward(self, x):
|
|
|
|
+ x = x.view(-1, int(x.size(1) * x.size(2)))
|
|
|
|
+ x = F.relu(self.fc1(x))
|
|
|
|
+ x = F.dropout(x, 0.7, training=self.training)
|
|
|
|
+ return self.fc2(x)
|
|
|
|
+
|
|
|
|
+def extract_keypoint(get_keypoint, keypoint):
|
|
|
|
+ # nose
|
|
|
|
+ nose_x, nose_y = keypoint[get_keypoint.NOSE]
|
|
|
|
+ # eye
|
|
|
|
+ # left_eye_x, left_eye_y = keypoint[get_keypoint.LEFT_EYE]
|
|
|
|
+ # right_eye_x, right_eye_y = keypoint[get_keypoint.RIGHT_EYE]
|
|
|
|
+ # # ear
|
|
|
|
+ # left_ear_x, left_ear_y = keypoint[get_keypoint.LEFT_EAR]
|
|
|
|
+ # right_ear_x, right_ear_y = keypoint[get_keypoint.RIGHT_EAR]
|
|
|
|
+ # shoulder
|
|
|
|
+ left_shoulder_x, left_shoulder_y = keypoint[get_keypoint.LEFT_SHOULDER]
|
|
|
|
+ right_shoulder_x, right_shoulder_y = keypoint[get_keypoint.RIGHT_SHOULDER]
|
|
|
|
+ # elbow
|
|
|
|
+ left_elbow_x, left_elbow_y = keypoint[get_keypoint.LEFT_ELBOW]
|
|
|
|
+ right_elbow_x, right_elbow_y = keypoint[get_keypoint.RIGHT_ELBOW]
|
|
|
|
+ # wrist
|
|
|
|
+ left_wrist_x, left_wrist_y = keypoint[get_keypoint.LEFT_WRIST]
|
|
|
|
+ right_wrist_x, right_wrist_y = keypoint[get_keypoint.RIGHT_WRIST]
|
|
|
|
+ # hip
|
|
|
|
+ left_hip_x, left_hip_y = keypoint[get_keypoint.LEFT_HIP]
|
|
|
|
+ right_hip_x, right_hip_y = keypoint[get_keypoint.RIGHT_HIP]
|
|
|
|
+ # knee
|
|
|
|
+ left_knee_x, left_knee_y = keypoint[get_keypoint.LEFT_KNEE]
|
|
|
|
+ right_knee_x, right_knee_y = keypoint[get_keypoint.RIGHT_KNEE]
|
|
|
|
+ # ankle
|
|
|
|
+ left_ankle_x, left_ankle_y = keypoint[get_keypoint.LEFT_ANKLE]
|
|
|
|
+ right_ankle_x, right_ankle_y = keypoint[get_keypoint.RIGHT_ANKLE]
|
|
|
|
+
|
|
|
|
+ return [
|
|
|
|
+ nose_x, nose_y ,
|
|
|
|
+ left_shoulder_x, left_shoulder_y ,
|
|
|
|
+ right_shoulder_x, right_shoulder_y,
|
|
|
|
+ left_elbow_x, left_elbow_y,
|
|
|
|
+ right_elbow_x, right_elbow_y,
|
|
|
|
+ left_wrist_x, left_wrist_y,
|
|
|
|
+ right_wrist_x, right_wrist_y,
|
|
|
|
+ left_hip_x, left_hip_y,
|
|
|
|
+ right_hip_x, right_hip_y,
|
|
|
|
+ left_knee_x, left_knee_y,
|
|
|
|
+ right_knee_x, right_knee_y,
|
|
|
|
+ left_ankle_x, left_ankle_y,
|
|
|
|
+ right_ankle_x, right_ankle_y
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class GetKeypoint(BaseModel):
|
|
|
|
+ NOSE: int = 0
|
|
|
|
+ LEFT_EYE: int = 1
|
|
|
|
+ RIGHT_EYE: int = 2
|
|
|
|
+ LEFT_EAR: int = 3
|
|
|
|
+ RIGHT_EAR: int = 4
|
|
|
|
+ LEFT_SHOULDER: int = 5
|
|
|
|
+ RIGHT_SHOULDER: int = 6
|
|
|
|
+ LEFT_ELBOW: int = 7
|
|
|
|
+ RIGHT_ELBOW: int = 8
|
|
|
|
+ LEFT_WRIST: int = 9
|
|
|
|
+ RIGHT_WRIST: int = 10
|
|
|
|
+ LEFT_HIP: int = 11
|
|
|
|
+ RIGHT_HIP: int = 12
|
|
|
|
+ LEFT_KNEE: int = 13
|
|
|
|
+ RIGHT_KNEE: int = 14
|
|
|
|
+ LEFT_ANKLE: int = 15
|
|
|
|
+ RIGHT_ANKLE: int = 16
|
|
|
|
+
|
|
|
|
+class Box(BaseModel):
|
|
|
|
+ left : int
|
|
|
|
+ top : int
|
|
|
|
+ right : int
|
|
|
|
+ bottom : int
|
|
|
|
+ box_conf : float
|
|
|
|
+ pose_classifer_conf : float
|
|
|
|
+ label : str
|
|
|
|
+
|
|
|
|
+class YoloPoseClassfier(object):
|
|
|
|
+ def __init__(self, device_id, yolo_pose_model_path, pose_classfier_model_path, confidence_threshold=0.5):
|
|
|
|
+ self.confidence_threshold = confidence_threshold
|
|
|
|
+ self.device = torch.device("cpu") if device_id == -1 else torch.device(f"cuda:{device_id}")
|
|
|
|
+ # define get key point function
|
|
|
|
+ self.get_keypoint = GetKeypoint()
|
|
|
|
+
|
|
|
|
+ # load model
|
|
|
|
+ self.pose_classfier_model = Model(None, 13, 2, 2)
|
|
|
|
+ self.pose_classfier_model.load_state_dict(torch.load(pose_classfier_model_path))
|
|
|
|
+ self.pose_classfier_model.eval()
|
|
|
|
+ self.pose_classfier_model.to(self.device)
|
|
|
|
+
|
|
|
|
+ self.yolo_pose_model = YOLO(yolo_pose_model_path)
|
|
|
|
+ self.yolo_pose_model.to(self.device)
|
|
|
|
+
|
|
|
|
+ # 新增躯干角度校验函数
|
|
|
|
+ def calculate_torso_angle(keypoints):
|
|
|
|
+ # 获取肩膀(索引5、6)和臀部(索引11、12)关键点
|
|
|
|
+ left_shoulder = keypoints[5]
|
|
|
|
+ right_shoulder = keypoints[6]
|
|
|
|
+ left_hip = keypoints[11]
|
|
|
|
+ right_hip = keypoints[12]
|
|
|
|
+
|
|
|
|
+ # 计算肩膀中点与臀部中点的向量
|
|
|
|
+ shoulder_center = ((left_shoulder[0] + right_shoulder[0]) / 2,
|
|
|
|
+ (left_shoulder[1] + right_shoulder[1]) / 2)
|
|
|
|
+ hip_center = ((left_hip[0] + right_hip[0]) / 2,
|
|
|
|
+ (left_hip[1] + right_hip[1]) / 2)
|
|
|
|
+
|
|
|
|
+ # 计算与垂直方向的夹角(跌倒时角度接近90度)
|
|
|
|
+ dx = hip_center[0] - shoulder_center[0]
|
|
|
|
+ dy = hip_center[1] - shoulder_center[1]
|
|
|
|
+ angle = np.degrees(np.arctan2(abs(dx), abs(dy)))
|
|
|
|
+ return angle
|
|
|
|
+
|
|
|
|
+ def __call__(self, image : np.ndarray):
|
|
|
|
+ pose_classfier_results = []
|
|
|
|
+ yolo_pose_results = self.yolo_pose_model(image, conf=self.confidence_threshold)
|
|
|
|
+ for result in yolo_pose_results[0]:
|
|
|
|
+ boxes = result.boxes.xyxy.cpu().numpy().tolist()
|
|
|
|
+ confs = result.boxes.conf.cpu().numpy().tolist()
|
|
|
|
+ all_keypoints = result.keypoints.data.cpu().numpy().tolist()
|
|
|
|
+ for box, conf, keypoints in zip(boxes, confs, all_keypoints):
|
|
|
|
+ x1, y1, x2, y2 = box
|
|
|
|
+ x, y, w, h = x1, y1, x2 - x1, y2 - y1
|
|
|
|
+ n_keypoints = [[(kp[0] - x) / w - 0.5, (kp[1] - y) / h - 0.5] if kp[0] > 0 and kp[1] > 0 else kp[:2] for kp in keypoints]
|
|
|
|
+ n_keypoints = extract_keypoint(self.get_keypoint, n_keypoints)
|
|
|
|
+ if n_keypoints[-12:].count(0) >= 2 * 2:
|
|
|
|
+ continue
|
|
|
|
+ if n_keypoints.count(0) >= 4 * 2:
|
|
|
|
+ continue
|
|
|
|
+ if w < h:
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ pose_data = torch.Tensor([n_keypoints]).to(self.device)
|
|
|
|
+ pose_data = pose_data.reshape(1, 13, 2)
|
|
|
|
+ with torch.no_grad():
|
|
|
|
+ p = self.pose_classfier_model(pose_data)
|
|
|
|
+ prob = F.softmax(p)
|
|
|
|
+ index = prob.argmax()
|
|
|
|
+ if index == 0:
|
|
|
|
+ score = float(prob[0][index].cpu().numpy())
|
|
|
|
+ pose_classfier_results.append(
|
|
|
|
+ Box(left=x1, top=y1, right=x2, bottom=y2, box_conf=conf, pose_classifer_conf=score, label="falling"))
|
|
|
|
+ return pose_classfier_results
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
+ yolo_pose_model_path = "yolo11l-pose.pt"
|
|
|
|
+ pose_classfier_model_path = "fall_cls.pth"
|
|
|
|
+
|
|
|
|
+ model = YoloPoseClassfier(-1, yolo_pose_model_path, pose_classfier_model_path, 0.5)
|
|
|
|
+
|
|
|
|
+ test_images_dir = r"C:\Users\86187\Desktop\fall_test"
|
|
|
|
+ save_image_dir = "."
|
|
|
|
+
|
|
|
|
+ for image_name in os.listdir(test_images_dir):
|
|
|
|
+ print(image_name)
|
|
|
|
+ image_path = os.path.join(test_images_dir, image_name)
|
|
|
|
+ image = cv2.imread(image_path)
|
|
|
|
+ pose_classfier_results = model(image)
|
|
|
|
+ for res in pose_classfier_results:
|
|
|
|
+ cv2.rectangle(image, (int(res.left), int(res.top)), (int(res.right), int(res.bottom)), (0, 255, 0), 2)
|
|
|
|
+ cv2.imwrite(os.path.join(save_image_dir, image_name), image)
|
|
|
|
+
|
|
|
|
+
|