1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import cv2
- import base64
- import threading
- import requests
- prompt_words = {
- "phone" : "1",
- "fall" : "2",
- "fire" : "3"
- }
- def cv2_to_base64(image):
- image1 = cv2.imencode('.jpg', image)[1]
- image_code = str(base64.b64encode(image1))[2:-1]
- return image_code
- def get_cut_position(boxes, w, h):
- xmin, ymin, xmax, ymax = w, h, 0, 0
- if len(boxes) == 1:
- xmin, ymin, xmax, ymax = boxes[0]
- box_w = xmax - xmin
- box_h = ymax - ymin
- xmin = max(0, xmin - box_w // 5)
- ymin = max(0, ymin - box_h // 5)
- xmax = min(w, xmax + box_w // 5)
- ymax = max(h, ymax + box_h // 5)
- else:
- for box in boxes:
- x1, y1, x2, y2 = box
- xmin = min(x1, xmin)
- ymin = min(y1, ymin)
- xmax = max(xmax, x2)
- ymax = max(ymax, y2)
- box_w = xmax - xmin
- box_h = ymax - ymin
- xmin = max(0, xmin - box_w // 2)
- ymin = max(0, ymin - box_h // 2)
- xmax = min(w, xmax + box_w // 2)
- ymax = max(h, ymax + box_h // 2)
- return xmin, ymin, xmax, ymax
- def post_illegal_data():
- pass
- def task(boxes, image, task_name, llm_url):
- """
- boxes : 违章框的列表 [[x1,y1,x2,y2], ...]
- image : 图片的opencv格式
- task_name : 任务名称 phone : 打电话, fall : 摔倒, fire : 烟雾火焰
- """
- h, w, _ = image.shape
- xmin, ymin, xmax, ymax = get_cut_position(boxes, w, h)
- crop_image = image[ymin:ymax, xmin:xmax]
- crop_image_base64 = cv2_to_base64(crop_image)
- prompt_word = prompt_words[task_name]
-
- data = {
- "imageData" : crop_image_base64,
- "text" : prompt_word
- }
- try:
- response = requests.post(llm_url, json=data)
- except Exception as error:
- print(error)
- return
- res = response.json()
- if res["data"]["illegal"] == 1:
- post_illegal_data()
- def create_llm_task(boxes, image, task_name, llm_url):
- llm_thread = threading.Thread(target=task, args=(boxes, image, task_name, llm_url), daemon=True)
- llm_thread.start()
|