254 lines
6.8 KiB
Python
254 lines
6.8 KiB
Python
import html
|
||
import json
|
||
import re
|
||
import xml.etree.ElementTree as ET
|
||
from typing import Any
|
||
|
||
|
||
QUOTE_CONTENT_LIMIT = 600
|
||
|
||
|
||
def extract_contents(item: dict) -> dict:
|
||
contents = item.get("contents") or item.get("Contents") or {}
|
||
return contents if isinstance(contents, dict) else {}
|
||
|
||
|
||
def clean_message_text(value: Any) -> str:
|
||
text = html.unescape(str(value or "")).strip()
|
||
text = re.sub(r"\s+", " ", text)
|
||
if len(text) > QUOTE_CONTENT_LIMIT:
|
||
text = text[:QUOTE_CONTENT_LIMIT] + "..."
|
||
return text
|
||
|
||
|
||
def _local_name(tag: str) -> str:
|
||
return tag.rsplit("}", 1)[-1]
|
||
|
||
|
||
def _safe_int(value: Any) -> int | None:
|
||
if value in (None, ""):
|
||
return None
|
||
try:
|
||
return int(str(value).strip())
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _first(data: dict, *keys: str) -> Any:
|
||
for key in keys:
|
||
value = data.get(key)
|
||
if value not in (None, ""):
|
||
return value
|
||
return None
|
||
|
||
|
||
def _has_quote_indicator(data: dict) -> bool:
|
||
keys = {str(key) for key in data.keys()}
|
||
indicators = {
|
||
"quote",
|
||
"refermsg",
|
||
"referMsg",
|
||
"refer",
|
||
"recordInfo",
|
||
"recordinfo",
|
||
"fromusr",
|
||
"fromUser",
|
||
"chatusr",
|
||
"chatUser",
|
||
"displayname",
|
||
"displayName",
|
||
"referContent",
|
||
"svrid",
|
||
"newmsgid",
|
||
"newMsgId",
|
||
}
|
||
return bool(keys & indicators)
|
||
|
||
|
||
def _decode_json(value: str) -> Any:
|
||
try:
|
||
return json.loads(value)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _xml_node_text(node: ET.Element, names: set[str]) -> str:
|
||
for child in node.iter():
|
||
if _local_name(child.tag) in names:
|
||
text = "".join(child.itertext()).strip()
|
||
if text:
|
||
return text
|
||
return ""
|
||
|
||
|
||
def _quote_from_xml(value: str) -> dict | None:
|
||
text = html.unescape(value or "").strip()
|
||
if "<" not in text or ">" not in text:
|
||
return None
|
||
try:
|
||
root = ET.fromstring(text)
|
||
except Exception:
|
||
try:
|
||
root = ET.fromstring(f"<root>{text}</root>")
|
||
except Exception:
|
||
return None
|
||
|
||
refer_node = None
|
||
for node in root.iter():
|
||
if _local_name(node.tag).lower() == "refermsg":
|
||
refer_node = node
|
||
break
|
||
if refer_node is None:
|
||
return None
|
||
|
||
content = _xml_node_text(refer_node, {"content", "title", "desc"})
|
||
sender_name = _xml_node_text(refer_node, {"displayname", "nickname", "fromnickname"})
|
||
sender = _xml_node_text(refer_node, {"fromusr", "chatusr", "sender"})
|
||
msg_type = _safe_int(_xml_node_text(refer_node, {"type"}))
|
||
seq = _safe_int(_xml_node_text(refer_node, {"seq", "msgid", "newmsgid", "svrid"}))
|
||
|
||
return _normalize_quote(
|
||
{
|
||
"sender": sender,
|
||
"sender_name": sender_name,
|
||
"content": content,
|
||
"type": msg_type,
|
||
"seq": seq,
|
||
}
|
||
)
|
||
|
||
|
||
def _find_quote_payload(value: Any, allow_plain_text: bool = False) -> dict | None:
|
||
if value in (None, ""):
|
||
return None
|
||
|
||
if isinstance(value, str):
|
||
text = value.strip()
|
||
if not text:
|
||
return None
|
||
decoded = _decode_json(text) if text[:1] in ("{", "[") else None
|
||
if decoded is not None:
|
||
return _find_quote_payload(decoded, allow_plain_text=allow_plain_text)
|
||
xml_quote = _quote_from_xml(text)
|
||
if xml_quote:
|
||
return xml_quote
|
||
if allow_plain_text:
|
||
return _normalize_quote({"content": text})
|
||
return None
|
||
|
||
if isinstance(value, list):
|
||
for item in value:
|
||
quote = _find_quote_payload(item, allow_plain_text=allow_plain_text)
|
||
if quote:
|
||
return quote
|
||
return None
|
||
|
||
if not isinstance(value, dict):
|
||
return None
|
||
|
||
for key in ("quote", "refermsg", "referMsg", "refer", "recordInfo", "recordinfo"):
|
||
if key in value:
|
||
quote = _find_quote_payload(value.get(key), allow_plain_text=True)
|
||
if quote:
|
||
return quote
|
||
|
||
quote = _normalize_quote(value) if allow_plain_text or _has_quote_indicator(value) else None
|
||
if quote:
|
||
return quote
|
||
|
||
for nested in value.values():
|
||
quote = _find_quote_payload(nested, allow_plain_text=False)
|
||
if quote:
|
||
return quote
|
||
return None
|
||
|
||
|
||
def _normalize_quote(data: dict) -> dict | None:
|
||
content = clean_message_text(
|
||
_first(
|
||
data,
|
||
"content",
|
||
"Content",
|
||
"text",
|
||
"title",
|
||
"desc",
|
||
"digest",
|
||
"displayContent",
|
||
"referContent",
|
||
)
|
||
)
|
||
if not content:
|
||
return None
|
||
|
||
sender = clean_message_text(
|
||
_first(data, "sender", "Sender", "fromusr", "fromUser", "chatusr", "chatUser", "from")
|
||
)
|
||
sender_name = clean_message_text(
|
||
_first(data, "sender_name", "senderName", "SenderName", "displayname", "displayName", "nickname", "nickName")
|
||
)
|
||
msg_type = _safe_int(_first(data, "type", "Type", "msgType", "subType"))
|
||
seq = _safe_int(_first(data, "seq", "Seq", "sort_seq", "msgid", "msgId", "newmsgid", "newMsgId", "svrid"))
|
||
|
||
return {
|
||
"sender": sender,
|
||
"sender_name": sender_name,
|
||
"content": content,
|
||
"type": msg_type,
|
||
"seq": seq,
|
||
}
|
||
|
||
|
||
def extract_quote(item: dict | None) -> dict | None:
|
||
if not isinstance(item, dict):
|
||
return None
|
||
|
||
contents = extract_contents(item)
|
||
explicit_sources = (
|
||
item.get("quote"),
|
||
item.get("Quote"),
|
||
item.get("refer"),
|
||
item.get("recordInfo"),
|
||
contents.get("quote"),
|
||
contents.get("refer"),
|
||
contents.get("refermsg"),
|
||
contents.get("referMsg"),
|
||
contents.get("recordInfo"),
|
||
contents.get("recordinfo"),
|
||
)
|
||
for source in explicit_sources:
|
||
quote = _find_quote_payload(source, allow_plain_text=True)
|
||
if quote:
|
||
return quote
|
||
|
||
for source in (
|
||
contents.get("appmsg"),
|
||
item.get("content"),
|
||
item.get("Content"),
|
||
):
|
||
quote = _find_quote_payload(source, allow_plain_text=False)
|
||
if quote:
|
||
return quote
|
||
return None
|
||
|
||
|
||
def attach_quote(item: dict) -> dict:
|
||
item["quote"] = extract_quote(item)
|
||
return item
|
||
|
||
|
||
def quote_to_text(quote: dict | None) -> str:
|
||
if not quote:
|
||
return ""
|
||
sender = quote.get("sender_name") or quote.get("sender") or "未知"
|
||
seq = quote.get("seq")
|
||
seq_text = f" seq={seq}" if seq else ""
|
||
return f"[引用消息{seq_text}] {sender}: {quote.get('content') or ''}".strip()
|
||
|
||
|
||
def append_quote_text(base_text: str, item: dict) -> str:
|
||
parts = [base_text.strip()] if base_text and base_text.strip() else []
|
||
quote_text = quote_to_text(extract_quote(item))
|
||
if quote_text:
|
||
parts.append(quote_text)
|
||
return ";".join(parts)
|