import html import json import re import xml.etree.ElementTree as ET from typing import Any QUOTE_CONTENT_LIMIT = 600 def extract_contents(item: dict) -> dict: contents = item.get("contents") or item.get("Contents") or {} return contents if isinstance(contents, dict) else {} def clean_message_text(value: Any) -> str: text = html.unescape(str(value or "")).strip() text = re.sub(r"\s+", " ", text) if len(text) > QUOTE_CONTENT_LIMIT: text = text[:QUOTE_CONTENT_LIMIT] + "..." return text def _local_name(tag: str) -> str: return tag.rsplit("}", 1)[-1] def _safe_int(value: Any) -> int | None: if value in (None, ""): return None try: return int(str(value).strip()) except Exception: return None def _first(data: dict, *keys: str) -> Any: for key in keys: value = data.get(key) if value not in (None, ""): return value return None def _has_quote_indicator(data: dict) -> bool: keys = {str(key) for key in data.keys()} indicators = { "quote", "refermsg", "referMsg", "refer", "recordInfo", "recordinfo", "fromusr", "fromUser", "chatusr", "chatUser", "displayname", "displayName", "referContent", "svrid", "newmsgid", "newMsgId", } return bool(keys & indicators) def _decode_json(value: str) -> Any: try: return json.loads(value) except Exception: return None def _xml_node_text(node: ET.Element, names: set[str]) -> str: for child in node.iter(): if _local_name(child.tag) in names: text = "".join(child.itertext()).strip() if text: return text return "" def _quote_from_xml(value: str) -> dict | None: text = html.unescape(value or "").strip() if "<" not in text or ">" not in text: return None try: root = ET.fromstring(text) except Exception: try: root = ET.fromstring(f"{text}") except Exception: return None refer_node = None for node in root.iter(): if _local_name(node.tag).lower() == "refermsg": refer_node = node break if refer_node is None: return None content = _xml_node_text(refer_node, {"content", "title", "desc"}) sender_name = _xml_node_text(refer_node, {"displayname", "nickname", "fromnickname"}) sender = _xml_node_text(refer_node, {"fromusr", "chatusr", "sender"}) msg_type = _safe_int(_xml_node_text(refer_node, {"type"})) seq = _safe_int(_xml_node_text(refer_node, {"seq", "msgid", "newmsgid", "svrid"})) return _normalize_quote( { "sender": sender, "sender_name": sender_name, "content": content, "type": msg_type, "seq": seq, } ) def _find_quote_payload(value: Any, allow_plain_text: bool = False) -> dict | None: if value in (None, ""): return None if isinstance(value, str): text = value.strip() if not text: return None decoded = _decode_json(text) if text[:1] in ("{", "[") else None if decoded is not None: return _find_quote_payload(decoded, allow_plain_text=allow_plain_text) xml_quote = _quote_from_xml(text) if xml_quote: return xml_quote if allow_plain_text: return _normalize_quote({"content": text}) return None if isinstance(value, list): for item in value: quote = _find_quote_payload(item, allow_plain_text=allow_plain_text) if quote: return quote return None if not isinstance(value, dict): return None for key in ("quote", "refermsg", "referMsg", "refer", "recordInfo", "recordinfo"): if key in value: quote = _find_quote_payload(value.get(key), allow_plain_text=True) if quote: return quote quote = _normalize_quote(value) if allow_plain_text or _has_quote_indicator(value) else None if quote: return quote for nested in value.values(): quote = _find_quote_payload(nested, allow_plain_text=False) if quote: return quote return None def _normalize_quote(data: dict) -> dict | None: content = clean_message_text( _first( data, "content", "Content", "text", "title", "desc", "digest", "displayContent", "referContent", ) ) if not content: return None sender = clean_message_text( _first(data, "sender", "Sender", "fromusr", "fromUser", "chatusr", "chatUser", "from") ) sender_name = clean_message_text( _first(data, "sender_name", "senderName", "SenderName", "displayname", "displayName", "nickname", "nickName") ) msg_type = _safe_int(_first(data, "type", "Type", "msgType", "subType")) seq = _safe_int(_first(data, "seq", "Seq", "sort_seq", "msgid", "msgId", "newmsgid", "newMsgId", "svrid")) return { "sender": sender, "sender_name": sender_name, "content": content, "type": msg_type, "seq": seq, } def extract_quote(item: dict | None) -> dict | None: if not isinstance(item, dict): return None contents = extract_contents(item) explicit_sources = ( item.get("quote"), item.get("Quote"), item.get("refer"), item.get("recordInfo"), contents.get("quote"), contents.get("refer"), contents.get("refermsg"), contents.get("referMsg"), contents.get("recordInfo"), contents.get("recordinfo"), ) for source in explicit_sources: quote = _find_quote_payload(source, allow_plain_text=True) if quote: return quote for source in ( contents.get("appmsg"), item.get("content"), item.get("Content"), ): quote = _find_quote_payload(source, allow_plain_text=False) if quote: return quote return None def attach_quote(item: dict) -> dict: item["quote"] = extract_quote(item) return item def quote_to_text(quote: dict | None) -> str: if not quote: return "" sender = quote.get("sender_name") or quote.get("sender") or "未知" seq = quote.get("seq") seq_text = f" seq={seq}" if seq else "" return f"[引用消息{seq_text}] {sender}: {quote.get('content') or ''}".strip() def append_quote_text(base_text: str, item: dict) -> str: parts = [base_text.strip()] if base_text and base_text.strip() else [] quote_text = quote_to_text(extract_quote(item)) if quote_text: parts.append(quote_text) return ";".join(parts)