Files
get_wechat/chatlog_fastAPI/services/report_learning.py

140 lines
4.7 KiB
Python

import re
import aiosqlite
from services.fts import build_match_query
MAX_EXAMPLES = 3
MAX_EXAMPLE_CHARS = 1800
MAX_CONTEXT_CHARS = 5200
def _compact(text: str, limit: int = MAX_EXAMPLE_CHARS) -> str:
text = re.sub(r"\n{3,}", "\n\n", (text or "").strip())
if len(text) <= limit:
return text
return text[:limit].rstrip() + "\n..."
def _format_examples(rows: list[aiosqlite.Row], purpose: str) -> str:
if not rows:
return ""
heading = {
"topic": "历史人工修订报告参考(用于学习话题命名和分类口径)",
"summary": "历史人工修订报告参考(只学习结构、措辞和关注点,不得照抄历史事实)",
}.get(purpose, "历史人工修订报告参考")
parts = [heading]
total = len(parts[0])
for idx, row in enumerate(rows, 1):
block = (
f"\n\n--- 示例 {idx} ---\n"
f"群聊:{row['group_name'] or row['talker'] or row['group_id']}\n"
f"话题标题:{row['title']}\n"
f"报告内容:\n{_compact(row['content'])}"
)
if total + len(block) > MAX_CONTEXT_CHARS:
break
parts.append(block)
total += len(block)
return "".join(parts).strip()
async def build_report_learning_context(
db: aiosqlite.Connection,
*,
group_id: int | None,
query: str = "",
exclude_topic_id: int | None = None,
purpose: str = "summary",
limit: int = MAX_EXAMPLES,
) -> str:
params: list[object] = []
exclude_sql = ""
if exclude_topic_id is not None:
exclude_sql = " AND t.id<>?"
params.append(exclude_topic_id)
selected: list[aiosqlite.Row] = []
seen_doc_ids: set[int] = set()
if group_id is not None:
async with db.execute(
f"""
SELECT k.id, k.content, k.updated_at, t.id AS topic_id, t.title, t.group_id,
g.name AS group_name, g.talker
FROM knowledge_docs k
JOIN topics t ON t.id = k.topic_id
LEFT JOIN groups g ON g.id = t.group_id
WHERE k.curated_at IS NOT NULL
AND t.group_id=?
{exclude_sql}
ORDER BY k.curated_at DESC, k.updated_at DESC
LIMIT ?
""",
[group_id, *params, limit],
) as cur:
rows = await cur.fetchall()
for row in rows:
selected.append(row)
seen_doc_ids.add(int(row["id"]))
if len(selected) < limit:
remaining = limit - len(selected)
fts_query = build_match_query(query or "")
if fts_query:
async with db.execute(
f"""
SELECT k.id, k.content, k.updated_at, t.id AS topic_id, t.title, t.group_id,
g.name AS group_name, g.talker
FROM knowledge_docs k
JOIN topics t ON t.id = k.topic_id
LEFT JOIN groups g ON g.id = t.group_id
WHERE k.curated_at IS NOT NULL
AND k.id IN (SELECT doc_id FROM knowledge_fts WHERE knowledge_fts MATCH ?)
{exclude_sql}
ORDER BY CASE WHEN t.group_id=? THEN 0 ELSE 1 END,
k.curated_at DESC,
k.updated_at DESC
LIMIT ?
""",
[fts_query, *params, group_id or -1, remaining * 3],
) as cur:
rows = await cur.fetchall()
for row in rows:
doc_id = int(row["id"])
if doc_id in seen_doc_ids:
continue
selected.append(row)
seen_doc_ids.add(doc_id)
if len(selected) >= limit:
break
if len(selected) < limit:
remaining = limit - len(selected)
async with db.execute(
f"""
SELECT k.id, k.content, k.updated_at, t.id AS topic_id, t.title, t.group_id,
g.name AS group_name, g.talker
FROM knowledge_docs k
JOIN topics t ON t.id = k.topic_id
LEFT JOIN groups g ON g.id = t.group_id
WHERE k.curated_at IS NOT NULL
{exclude_sql}
ORDER BY CASE WHEN t.group_id=? THEN 0 ELSE 1 END,
k.curated_at DESC,
k.updated_at DESC
LIMIT ?
""",
[*params, group_id or -1, remaining * 3],
) as cur:
rows = await cur.fetchall()
for row in rows:
doc_id = int(row["id"])
if doc_id in seen_doc_ids:
continue
selected.append(row)
seen_doc_ids.add(doc_id)
if len(selected) >= limit:
break
return _format_examples(selected[:limit], purpose)