import html
import json
import re
import xml.etree.ElementTree as ET
from typing import Any
QUOTE_CONTENT_LIMIT = 600
def extract_contents(item: dict) -> dict:
contents = item.get("contents") or item.get("Contents") or {}
return contents if isinstance(contents, dict) else {}
def clean_message_text(value: Any) -> str:
text = html.unescape(str(value or "")).strip()
text = re.sub(r"\s+", " ", text)
if len(text) > QUOTE_CONTENT_LIMIT:
text = text[:QUOTE_CONTENT_LIMIT] + "..."
return text
def _local_name(tag: str) -> str:
return tag.rsplit("}", 1)[-1]
def _safe_int(value: Any) -> int | None:
if value in (None, ""):
return None
try:
return int(str(value).strip())
except Exception:
return None
def _first(data: dict, *keys: str) -> Any:
for key in keys:
value = data.get(key)
if value not in (None, ""):
return value
return None
def _has_quote_indicator(data: dict) -> bool:
keys = {str(key) for key in data.keys()}
indicators = {
"quote",
"refermsg",
"referMsg",
"refer",
"recordInfo",
"recordinfo",
"fromusr",
"fromUser",
"chatusr",
"chatUser",
"displayname",
"displayName",
"referContent",
"svrid",
"newmsgid",
"newMsgId",
}
return bool(keys & indicators)
def _decode_json(value: str) -> Any:
try:
return json.loads(value)
except Exception:
return None
def _xml_node_text(node: ET.Element, names: set[str]) -> str:
for child in node.iter():
if _local_name(child.tag) in names:
text = "".join(child.itertext()).strip()
if text:
return text
return ""
def _quote_from_xml(value: str) -> dict | None:
text = html.unescape(value or "").strip()
if "<" not in text or ">" not in text:
return None
try:
root = ET.fromstring(text)
except Exception:
try:
root = ET.fromstring(f"{text}")
except Exception:
return None
refer_node = None
for node in root.iter():
if _local_name(node.tag).lower() == "refermsg":
refer_node = node
break
if refer_node is None:
return None
content = _xml_node_text(refer_node, {"content", "title", "desc"})
sender_name = _xml_node_text(refer_node, {"displayname", "nickname", "fromnickname"})
sender = _xml_node_text(refer_node, {"fromusr", "chatusr", "sender"})
msg_type = _safe_int(_xml_node_text(refer_node, {"type"}))
seq = _safe_int(_xml_node_text(refer_node, {"seq", "msgid", "newmsgid", "svrid"}))
return _normalize_quote(
{
"sender": sender,
"sender_name": sender_name,
"content": content,
"type": msg_type,
"seq": seq,
}
)
def _find_quote_payload(value: Any, allow_plain_text: bool = False) -> dict | None:
if value in (None, ""):
return None
if isinstance(value, str):
text = value.strip()
if not text:
return None
decoded = _decode_json(text) if text[:1] in ("{", "[") else None
if decoded is not None:
return _find_quote_payload(decoded, allow_plain_text=allow_plain_text)
xml_quote = _quote_from_xml(text)
if xml_quote:
return xml_quote
if allow_plain_text:
return _normalize_quote({"content": text})
return None
if isinstance(value, list):
for item in value:
quote = _find_quote_payload(item, allow_plain_text=allow_plain_text)
if quote:
return quote
return None
if not isinstance(value, dict):
return None
for key in ("quote", "refermsg", "referMsg", "refer", "recordInfo", "recordinfo"):
if key in value:
quote = _find_quote_payload(value.get(key), allow_plain_text=True)
if quote:
return quote
quote = _normalize_quote(value) if allow_plain_text or _has_quote_indicator(value) else None
if quote:
return quote
for nested in value.values():
quote = _find_quote_payload(nested, allow_plain_text=False)
if quote:
return quote
return None
def _normalize_quote(data: dict) -> dict | None:
content = clean_message_text(
_first(
data,
"content",
"Content",
"text",
"title",
"desc",
"digest",
"displayContent",
"referContent",
)
)
if not content:
return None
sender = clean_message_text(
_first(data, "sender", "Sender", "fromusr", "fromUser", "chatusr", "chatUser", "from")
)
sender_name = clean_message_text(
_first(data, "sender_name", "senderName", "SenderName", "displayname", "displayName", "nickname", "nickName")
)
msg_type = _safe_int(_first(data, "type", "Type", "msgType", "subType"))
seq = _safe_int(_first(data, "seq", "Seq", "sort_seq", "msgid", "msgId", "newmsgid", "newMsgId", "svrid"))
return {
"sender": sender,
"sender_name": sender_name,
"content": content,
"type": msg_type,
"seq": seq,
}
def extract_quote(item: dict | None) -> dict | None:
if not isinstance(item, dict):
return None
contents = extract_contents(item)
explicit_sources = (
item.get("quote"),
item.get("Quote"),
item.get("refer"),
item.get("recordInfo"),
contents.get("quote"),
contents.get("refer"),
contents.get("refermsg"),
contents.get("referMsg"),
contents.get("recordInfo"),
contents.get("recordinfo"),
)
for source in explicit_sources:
quote = _find_quote_payload(source, allow_plain_text=True)
if quote:
return quote
for source in (
contents.get("appmsg"),
item.get("content"),
item.get("Content"),
):
quote = _find_quote_payload(source, allow_plain_text=False)
if quote:
return quote
return None
def attach_quote(item: dict) -> dict:
item["quote"] = extract_quote(item)
return item
def quote_to_text(quote: dict | None) -> str:
if not quote:
return ""
sender = quote.get("sender_name") or quote.get("sender") or "未知"
seq = quote.get("seq")
seq_text = f" seq={seq}" if seq else ""
return f"[引用消息{seq_text}] {sender}: {quote.get('content') or ''}".strip()
def append_quote_text(base_text: str, item: dict) -> str:
parts = [base_text.strip()] if base_text and base_text.strip() else []
quote_text = quote_to_text(extract_quote(item))
if quote_text:
parts.append(quote_text)
return ";".join(parts)