|
@@ -0,0 +1,173 @@
|
|
|
+import email
|
|
|
+from email.message import Message
|
|
|
+from email.policy import default
|
|
|
+import json
|
|
|
+from datetime import datetime
|
|
|
+from lxml import etree
|
|
|
+
|
|
|
+def clean_xml(xml_string):
|
|
|
+ """移除 XML 中的命名空间,包括 xmlns 声明"""
|
|
|
+ parser = etree.XMLParser(remove_blank_text=True)
|
|
|
+ root = etree.XML(xml_string, parser)
|
|
|
+
|
|
|
+ # 遍历所有元素,移除命名空间
|
|
|
+ for elem in root.iter():
|
|
|
+ if '}' in elem.tag: # 如果有命名空间
|
|
|
+ elem.tag = elem.tag.split('}', 1)[1] # 移除命名空间
|
|
|
+ if 'version' in elem.attrib:
|
|
|
+ del elem.attrib['version']
|
|
|
+ etree.cleanup_namespaces(root)
|
|
|
+ return etree.tostring(root, pretty_print=True, encoding="unicode")
|
|
|
+
|
|
|
+def parse_xml_to_dict(xml):
|
|
|
+ """
|
|
|
+ 将xml文件解析成字典形式,参考tensorflow的recursive_parse_xml_to_dict
|
|
|
+ Args:
|
|
|
+ xml: xml tree obtained by parsing XML file contents using lxml.etree
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Python dictionary holding XML contents.
|
|
|
+ """
|
|
|
+
|
|
|
+ if len(xml) == 0: # 遍历到底层,直接返回tag对应的信息
|
|
|
+ return{xml.tag: xml.text}
|
|
|
+
|
|
|
+ result= {}
|
|
|
+ for child in xml:
|
|
|
+ child_result= parse_xml_to_dict(child) # 递归遍历标签信息
|
|
|
+ if child.tag not in ['pictureInfo', 'ANPR', "PictureURLInfo"]:
|
|
|
+ result[child.tag] = child_result[child.tag]
|
|
|
+ else:
|
|
|
+ if child.tag not in result: # 因为object可能有多个,所以需要放入列表里
|
|
|
+ result[child.tag] = []
|
|
|
+ result[child.tag].append(child_result[child.tag])
|
|
|
+ return {xml.tag: result}
|
|
|
+
|
|
|
+
|
|
|
+def parse_multipart(data, boundary):
|
|
|
+ # 按 boundary 分割数据
|
|
|
+ parts = data.split(boundary)
|
|
|
+ parsed_data = {}
|
|
|
+ for part in parts:
|
|
|
+ # print("part : ", part)
|
|
|
+ part = part.strip()
|
|
|
+ # print("part strip: ", part)
|
|
|
+ if not part: continue
|
|
|
+ # 获取头部和主体部分
|
|
|
+ headers, _, body = part.partition(b"\r\n\r\n")
|
|
|
+ # print("headers :", headers)
|
|
|
+ if not headers: continue
|
|
|
+ header_dict = email.message_from_bytes(headers, policy=default)
|
|
|
+ # print("header_dict : ", header_dict)
|
|
|
+ # 获取 Content-Disposition
|
|
|
+ content_disposition = header_dict.get("Content-Disposition", "")
|
|
|
+ # print("content_disposition :", content_disposition)
|
|
|
+ if "name=" in content_disposition:
|
|
|
+ name = content_disposition.split('name="')[1].split('"')[0]
|
|
|
+ filename = None
|
|
|
+ if 'filename="' in content_disposition:
|
|
|
+ filename = content_disposition.split('filename="')[1].split('"')[0]
|
|
|
+
|
|
|
+ # 提取 Content-Type
|
|
|
+ content_type = header_dict.get("Content-Type", "").strip()
|
|
|
+
|
|
|
+ # 根据 Content-Type 处理内容
|
|
|
+ if content_type == "application/json":
|
|
|
+ body_content = json.loads(body.decode("utf-8"))
|
|
|
+ elif content_type == "application/xml":
|
|
|
+ body = body.decode('utf-8').replace("</ ", "</").encode()
|
|
|
+ clean_xml_str = clean_xml(body)
|
|
|
+ xml = etree.fromstring(clean_xml_str)
|
|
|
+ body_content = parse_xml_to_dict(xml) # 直接存储 XML 原始内容
|
|
|
+ elif content_type.startswith("image/"):
|
|
|
+ body_content = body # 图像以二进制形式存储
|
|
|
+ # with open(filename, "wb") as f:
|
|
|
+ # f.write(body)
|
|
|
+ else:
|
|
|
+ body_content = body.decode() # 其他情况
|
|
|
+
|
|
|
+ # 存储解析的数据
|
|
|
+ parsed_data[name] = {
|
|
|
+ "filename": filename,
|
|
|
+ "content_type": content_type,
|
|
|
+ "content": body_content,
|
|
|
+ }
|
|
|
+ return parsed_data
|
|
|
+
|
|
|
+
|
|
|
+def parser_json_data(data):
|
|
|
+ infos = []
|
|
|
+ images = []
|
|
|
+
|
|
|
+ for _, value in data.items():
|
|
|
+ content_type = value.get("content_type")
|
|
|
+
|
|
|
+ if not content_type:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if content_type == "application/xml":
|
|
|
+ # 安全提取嵌套字段
|
|
|
+ content = value.get("content")
|
|
|
+ if content is None:
|
|
|
+ continue # 跳过内容为空的记录
|
|
|
+
|
|
|
+ event_alert = content.get("EventNotificationAlert")
|
|
|
+ if event_alert is None:
|
|
|
+ continue # 如果没有 EventNotificationAlert,跳过当前记录
|
|
|
+
|
|
|
+ iso_time = event_alert.get("dateTime")
|
|
|
+ dt = datetime.fromisoformat(iso_time)
|
|
|
+ dateTime = dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+
|
|
|
+ tfs = event_alert.get("TFS")
|
|
|
+ if tfs is None:
|
|
|
+ continue # 如果没有 TFS,跳过当前记录
|
|
|
+
|
|
|
+ VehicleInfo = tfs.get("VehicleInfo")
|
|
|
+ PlateInfo = tfs.get("PlateInfo")
|
|
|
+ picture_info_list = event_alert.get("PictureURLInfoList", {}).get("PictureURLInfo", [])
|
|
|
+
|
|
|
+ # 对每个字段进行逐一检查,确保有效后再添加到 infos
|
|
|
+ if VehicleInfo is None or PlateInfo is None or not picture_info_list:
|
|
|
+ continue # 如果任何重要字段为空,跳过当前记录
|
|
|
+
|
|
|
+ infos.append({
|
|
|
+ "dateTime" : dateTime,
|
|
|
+ "pictureInfo": picture_info_list,
|
|
|
+ "VehicleInfo": VehicleInfo,
|
|
|
+ "PlateInfo": PlateInfo
|
|
|
+ })
|
|
|
+
|
|
|
+ elif content_type == "image/jpeg":
|
|
|
+ # 安全提取图片内容
|
|
|
+ filename = value.get("filename")
|
|
|
+ image_content = value.get("content")
|
|
|
+
|
|
|
+ # 确保图片相关字段都有效后才添加
|
|
|
+ if filename and image_content:
|
|
|
+ images.append({
|
|
|
+ "filename": filename,
|
|
|
+ "content": image_content
|
|
|
+ })
|
|
|
+
|
|
|
+ return infos, images
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ with open("tfs_0.txt", "r") as f:
|
|
|
+ multipart_data = eval(f.read())
|
|
|
+ # print("===============", multipart_data[:3000])
|
|
|
+ # 示例调用
|
|
|
+ # multipart_data = b'---------------------------7e13971310878\r\n\r\nContent-Disposition: form-data; name="Q3531473496730912851860"; filename="radarVideoDetection.json"\r\n\r\nContent-Type: application/json\r\n\r\nContent-Length: 400\r\n\r\n\r\n\r\n{"ipAddress":"172.19.152.181","protocol":"HTTP","macAddress":"a4:d5:c2:02:96:bf","channelID":1,"dateTime":"2024-12-03T15:30:41.516+08:00","activePostCount":1,"eventType":"radarVideoDetection","eventState":"active","eventDescription":"Radar Video Detection","freezingTimeInfo":{"freezingTimestamp":96730747,"freezingSystemDateTime":"2024-12-03T15:30:41.360"},"Datas":[],"algorithmDataFrames":"966772"}\r\n\r\n---------------------------7e13971310878--\r\n\r\n'
|
|
|
+ # print(multipart_data[0:3000])
|
|
|
+ boundary = b'---------------------------7e13971310878'
|
|
|
+ result = parse_multipart(multipart_data, boundary)
|
|
|
+ print(result)
|
|
|
+ infos, images = parser_json_data(result)
|
|
|
+ print(infos, "======", images)
|
|
|
+
|
|
|
+ # with open("res.xml", "r") as f:
|
|
|
+ # xml_str = f.read()
|
|
|
+ # clean_xml_str = clean_xml(xml_str.encode())
|
|
|
+ # xml = etree.fromstring(clean_xml_str)
|
|
|
+ # print(parse_xml_to_dict(xml))
|