Files
get_wechat/chatlog_fastAPI/services/message_formatter.py

254 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import html
import json
import re
import xml.etree.ElementTree as ET
from typing import Any
QUOTE_CONTENT_LIMIT = 600
def extract_contents(item: dict) -> dict:
contents = item.get("contents") or item.get("Contents") or {}
return contents if isinstance(contents, dict) else {}
def clean_message_text(value: Any) -> str:
text = html.unescape(str(value or "")).strip()
text = re.sub(r"\s+", " ", text)
if len(text) > QUOTE_CONTENT_LIMIT:
text = text[:QUOTE_CONTENT_LIMIT] + "..."
return text
def _local_name(tag: str) -> str:
return tag.rsplit("}", 1)[-1]
def _safe_int(value: Any) -> int | None:
if value in (None, ""):
return None
try:
return int(str(value).strip())
except Exception:
return None
def _first(data: dict, *keys: str) -> Any:
for key in keys:
value = data.get(key)
if value not in (None, ""):
return value
return None
def _has_quote_indicator(data: dict) -> bool:
keys = {str(key) for key in data.keys()}
indicators = {
"quote",
"refermsg",
"referMsg",
"refer",
"recordInfo",
"recordinfo",
"fromusr",
"fromUser",
"chatusr",
"chatUser",
"displayname",
"displayName",
"referContent",
"svrid",
"newmsgid",
"newMsgId",
}
return bool(keys & indicators)
def _decode_json(value: str) -> Any:
try:
return json.loads(value)
except Exception:
return None
def _xml_node_text(node: ET.Element, names: set[str]) -> str:
for child in node.iter():
if _local_name(child.tag) in names:
text = "".join(child.itertext()).strip()
if text:
return text
return ""
def _quote_from_xml(value: str) -> dict | None:
text = html.unescape(value or "").strip()
if "<" not in text or ">" not in text:
return None
try:
root = ET.fromstring(text)
except Exception:
try:
root = ET.fromstring(f"<root>{text}</root>")
except Exception:
return None
refer_node = None
for node in root.iter():
if _local_name(node.tag).lower() == "refermsg":
refer_node = node
break
if refer_node is None:
return None
content = _xml_node_text(refer_node, {"content", "title", "desc"})
sender_name = _xml_node_text(refer_node, {"displayname", "nickname", "fromnickname"})
sender = _xml_node_text(refer_node, {"fromusr", "chatusr", "sender"})
msg_type = _safe_int(_xml_node_text(refer_node, {"type"}))
seq = _safe_int(_xml_node_text(refer_node, {"seq", "msgid", "newmsgid", "svrid"}))
return _normalize_quote(
{
"sender": sender,
"sender_name": sender_name,
"content": content,
"type": msg_type,
"seq": seq,
}
)
def _find_quote_payload(value: Any, allow_plain_text: bool = False) -> dict | None:
if value in (None, ""):
return None
if isinstance(value, str):
text = value.strip()
if not text:
return None
decoded = _decode_json(text) if text[:1] in ("{", "[") else None
if decoded is not None:
return _find_quote_payload(decoded, allow_plain_text=allow_plain_text)
xml_quote = _quote_from_xml(text)
if xml_quote:
return xml_quote
if allow_plain_text:
return _normalize_quote({"content": text})
return None
if isinstance(value, list):
for item in value:
quote = _find_quote_payload(item, allow_plain_text=allow_plain_text)
if quote:
return quote
return None
if not isinstance(value, dict):
return None
for key in ("quote", "refermsg", "referMsg", "refer", "recordInfo", "recordinfo"):
if key in value:
quote = _find_quote_payload(value.get(key), allow_plain_text=True)
if quote:
return quote
quote = _normalize_quote(value) if allow_plain_text or _has_quote_indicator(value) else None
if quote:
return quote
for nested in value.values():
quote = _find_quote_payload(nested, allow_plain_text=False)
if quote:
return quote
return None
def _normalize_quote(data: dict) -> dict | None:
content = clean_message_text(
_first(
data,
"content",
"Content",
"text",
"title",
"desc",
"digest",
"displayContent",
"referContent",
)
)
if not content:
return None
sender = clean_message_text(
_first(data, "sender", "Sender", "fromusr", "fromUser", "chatusr", "chatUser", "from")
)
sender_name = clean_message_text(
_first(data, "sender_name", "senderName", "SenderName", "displayname", "displayName", "nickname", "nickName")
)
msg_type = _safe_int(_first(data, "type", "Type", "msgType", "subType"))
seq = _safe_int(_first(data, "seq", "Seq", "sort_seq", "msgid", "msgId", "newmsgid", "newMsgId", "svrid"))
return {
"sender": sender,
"sender_name": sender_name,
"content": content,
"type": msg_type,
"seq": seq,
}
def extract_quote(item: dict | None) -> dict | None:
if not isinstance(item, dict):
return None
contents = extract_contents(item)
explicit_sources = (
item.get("quote"),
item.get("Quote"),
item.get("refer"),
item.get("recordInfo"),
contents.get("quote"),
contents.get("refer"),
contents.get("refermsg"),
contents.get("referMsg"),
contents.get("recordInfo"),
contents.get("recordinfo"),
)
for source in explicit_sources:
quote = _find_quote_payload(source, allow_plain_text=True)
if quote:
return quote
for source in (
contents.get("appmsg"),
item.get("content"),
item.get("Content"),
):
quote = _find_quote_payload(source, allow_plain_text=False)
if quote:
return quote
return None
def attach_quote(item: dict) -> dict:
item["quote"] = extract_quote(item)
return item
def quote_to_text(quote: dict | None) -> str:
if not quote:
return ""
sender = quote.get("sender_name") or quote.get("sender") or "未知"
seq = quote.get("seq")
seq_text = f" seq={seq}" if seq else ""
return f"[引用消息{seq_text}] {sender}: {quote.get('content') or ''}".strip()
def append_quote_text(base_text: str, item: dict) -> str:
parts = [base_text.strip()] if base_text and base_text.strip() else []
quote_text = quote_to_text(extract_quote(item))
if quote_text:
parts.append(quote_text)
return "".join(parts)