123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- import email
- from email.message import Message
- from email.policy import default
- import json
- from datetime import datetime
- from lxml import etree
- def clean_xml(xml_string):
- """移除 XML 中的命名空间,包括 xmlns 声明"""
- parser = etree.XMLParser(remove_blank_text=True)
- root = etree.XML(xml_string, parser)
- # 遍历所有元素,移除命名空间
- for elem in root.iter():
- if '}' in elem.tag: # 如果有命名空间
- elem.tag = elem.tag.split('}', 1)[1] # 移除命名空间
- if 'version' in elem.attrib:
- del elem.attrib['version']
- etree.cleanup_namespaces(root)
- return etree.tostring(root, pretty_print=True, encoding="unicode")
- def parse_xml_to_dict(xml):
- """
- 将xml文件解析成字典形式,参考tensorflow的recursive_parse_xml_to_dict
- Args:
- xml: xml tree obtained by parsing XML file contents using lxml.etree
- Returns:
- Python dictionary holding XML contents.
- """
- if len(xml) == 0: # 遍历到底层,直接返回tag对应的信息
- return{xml.tag: xml.text}
- result= {}
- for child in xml:
- child_result= parse_xml_to_dict(child) # 递归遍历标签信息
- if child.tag not in ['pictureInfo', 'ANPR', "PictureURLInfo"]:
- result[child.tag] = child_result[child.tag]
- else:
- if child.tag not in result: # 因为object可能有多个,所以需要放入列表里
- result[child.tag] = []
- result[child.tag].append(child_result[child.tag])
- return {xml.tag: result}
- def parse_multipart(data, boundary):
- # 按 boundary 分割数据
- parts = data.split(boundary)
- parsed_data = {}
- for part in parts:
- # print("part : ", part)
- part = part.strip()
- # print("part strip: ", part)
- if not part: continue
- # 获取头部和主体部分
- headers, _, body = part.partition(b"\r\n\r\n")
- # print("headers :", headers)
- if not headers: continue
- header_dict = email.message_from_bytes(headers, policy=default)
- # print("header_dict : ", header_dict)
- # 获取 Content-Disposition
- content_disposition = header_dict.get("Content-Disposition", "")
- # print("content_disposition :", content_disposition)
- if "name=" in content_disposition:
- name = content_disposition.split('name="')[1].split('"')[0]
- filename = None
- if 'filename="' in content_disposition:
- filename = content_disposition.split('filename="')[1].split('"')[0]
- # 提取 Content-Type
- content_type = header_dict.get("Content-Type", "").strip()
- # 根据 Content-Type 处理内容
- if content_type == "application/json":
- body_content = json.loads(body.decode("utf-8"))
- elif content_type == "application/xml":
- body = body.decode('utf-8').replace("</ ", "</").encode()
- clean_xml_str = clean_xml(body)
- xml = etree.fromstring(clean_xml_str)
- body_content = parse_xml_to_dict(xml) # 直接存储 XML 原始内容
- elif content_type.startswith("image/"):
- body_content = body # 图像以二进制形式存储
- # with open(filename, "wb") as f:
- # f.write(body)
- else:
- body_content = body.decode() # 其他情况
-
- # 存储解析的数据
- parsed_data[name] = {
- "filename": filename,
- "content_type": content_type,
- "content": body_content,
- }
- return parsed_data
- def parser_json_data(data):
- infos = []
- images = []
- for _, value in data.items():
- content_type = value.get("content_type")
-
- if not content_type:
- continue
-
- if content_type == "application/xml":
- # 安全提取嵌套字段
- content = value.get("content")
- if content is None:
- continue # 跳过内容为空的记录
- event_alert = content.get("EventNotificationAlert")
- if event_alert is None:
- continue # 如果没有 EventNotificationAlert,跳过当前记录
- iso_time = event_alert.get("dateTime")
- dt = datetime.fromisoformat(iso_time)
- dateTime = dt.strftime("%Y-%m-%d %H:%M:%S")
- tfs = event_alert.get("TFS")
- if tfs is None:
- continue # 如果没有 TFS,跳过当前记录
- VehicleInfo = tfs.get("VehicleInfo")
- PlateInfo = tfs.get("PlateInfo")
- picture_info_list = event_alert.get("PictureURLInfoList", {}).get("PictureURLInfo", [])
- # 对每个字段进行逐一检查,确保有效后再添加到 infos
- if VehicleInfo is None or PlateInfo is None or not picture_info_list:
- continue # 如果任何重要字段为空,跳过当前记录
- infos.append({
- "dateTime" : dateTime,
- "pictureInfo": picture_info_list,
- "VehicleInfo": VehicleInfo,
- "PlateInfo": PlateInfo
- })
- elif content_type == "image/jpeg":
- # 安全提取图片内容
- filename = value.get("filename")
- image_content = value.get("content")
- # 确保图片相关字段都有效后才添加
- if filename and image_content:
- images.append({
- "filename": filename,
- "content": image_content
- })
- return infos, images
-
- if __name__ == "__main__":
- with open("tfs_0.txt", "r") as f:
- multipart_data = eval(f.read())
- # print("===============", multipart_data[:3000])
- # 示例调用
- # multipart_data = b'---------------------------7e13971310878\r\n\r\nContent-Disposition: form-data; name="Q3531473496730912851860"; filename="radarVideoDetection.json"\r\n\r\nContent-Type: application/json\r\n\r\nContent-Length: 400\r\n\r\n\r\n\r\n{"ipAddress":"172.19.152.181","protocol":"HTTP","macAddress":"a4:d5:c2:02:96:bf","channelID":1,"dateTime":"2024-12-03T15:30:41.516+08:00","activePostCount":1,"eventType":"radarVideoDetection","eventState":"active","eventDescription":"Radar Video Detection","freezingTimeInfo":{"freezingTimestamp":96730747,"freezingSystemDateTime":"2024-12-03T15:30:41.360"},"Datas":[],"algorithmDataFrames":"966772"}\r\n\r\n---------------------------7e13971310878--\r\n\r\n'
- # print(multipart_data[0:3000])
- boundary = b'---------------------------7e13971310878'
- result = parse_multipart(multipart_data, boundary)
- print(result)
- infos, images = parser_json_data(result)
- print(infos, "======", images)
- # with open("res.xml", "r") as f:
- # xml_str = f.read()
- # clean_xml_str = clean_xml(xml_str.encode())
- # xml = etree.fromstring(clean_xml_str)
- # print(parse_xml_to_dict(xml))
|