import aiosqlite import asyncio import logging import time from pathlib import Path from config import settings from services.http_client import shared_client log = logging.getLogger(__name__) _data_db_dir = Path(settings.db_path).resolve().parent _data_db_dir.mkdir(parents=True, exist_ok=True) _current_db_path = str(Path(settings.db_path).resolve()) _initialized_dbs = set() _resolved_wxid: str | None = None _wxid_last_resolved: float = 0.0 _WXID_TTL = 60.0 # 60 秒后强制重新检测,确保账号切换能被感知 STALE_SUMMARIZE_ERROR = "AI 报告生成任务超过 15 分钟未完成,已自动标记为失败,可重新生成" def _db_path_for_wxid(wxid: str) -> str: if wxid and wxid != "default": safe = "".join(c for c in wxid if c.isalnum() or c in ("_", "-")) return str((_data_db_dir / f"knowledge_{safe}.db").resolve()) return str(Path(settings.db_path).resolve()) def reset_wxid_cache(): global _resolved_wxid, _wxid_last_resolved _resolved_wxid = None _wxid_last_resolved = 0.0 async def get_current_wxid(force: bool = False): global _resolved_wxid, _wxid_last_resolved now = time.time() # 已有有效缓存且未超时,直接返回 if ( not force and _resolved_wxid and _resolved_wxid != "default" and (now - _wxid_last_resolved) < _WXID_TTL ): return _resolved_wxid # 重新解析当前 wxid base = settings.chatlog_base_url client = shared_client() try: r = await client.get(f"{base}/api/v1/chatlog", params={"talker": "filehelper", "limit": 100, "time": "1970-01-01,2099-12-31", "format": "json"}, timeout=10.0) if r.status_code == 200: data = r.json() for msg in data.get("items", []): if msg.get("isSelf"): _resolved_wxid = msg.get("sender") _wxid_last_resolved = time.time() return _resolved_wxid except Exception: pass try: r = await client.get(f"{base}/api/v1/chatroom", params={"limit": 10, "format": "json"}, timeout=10.0) if r.status_code == 200: rooms = r.json().get("items", []) for room in rooms: room_id = room.get("name") r2 = await client.get(f"{base}/api/v1/chatlog", params={"talker": room_id, "limit": 50, "time": "1970-01-01,2099-12-31", "format": "json"}, timeout=10.0) if r2.status_code == 200: data2 = r2.json() for msg in data2.get("items", []): if msg.get("isSelf"): _resolved_wxid = msg.get("sender") _wxid_last_resolved = time.time() return _resolved_wxid except Exception: pass if force: reset_wxid_cache() return "default" async def update_db_path(force: bool = False): global _current_db_path wxid = await get_current_wxid(force=force) new_path = _db_path_for_wxid(wxid) if new_path != _current_db_path: log.info(f"Switching database to {new_path}") _current_db_path = new_path await init_db(new_path) # 账号切换:清掉 chatlog 客户端的 contact 库路径与头像缓存,避免显示上一个账号的头像。 # 局部导入避免潜在的模块循环引用。 from services.chatlog_client import chatlog_client chatlog_client.reset_account_cache() return _current_db_path def get_active_db_path(): return _current_db_path async def get_db(): path = get_active_db_path() if path not in _initialized_dbs: await init_db(path) async with aiosqlite.connect(path) as db: db.row_factory = aiosqlite.Row yield db async def init_db(path=None): if path is None: path = get_active_db_path() async with aiosqlite.connect(path) as db: await db.executescript(""" CREATE TABLE IF NOT EXISTS groups ( id INTEGER PRIMARY KEY, talker TEXT UNIQUE NOT NULL, name TEXT, analysis_prompt TEXT DEFAULT '', cursor_seq INTEGER DEFAULT 0, initialized INTEGER DEFAULT 0, poll_interval INTEGER DEFAULT 300, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS topics ( id INTEGER PRIMARY KEY, group_id INTEGER REFERENCES groups(id), title TEXT NOT NULL, source TEXT DEFAULT 'manual', status TEXT DEFAULT 'pending', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS topic_messages ( topic_id INTEGER REFERENCES topics(id), msg_seq INTEGER, talker TEXT, added_by TEXT DEFAULT 'ai', message_json TEXT, PRIMARY KEY (topic_id, msg_seq) ); CREATE TABLE IF NOT EXISTS knowledge_docs ( id INTEGER PRIMARY KEY, topic_id INTEGER UNIQUE REFERENCES topics(id), content TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, curated_at DATETIME ); CREATE VIRTUAL TABLE IF NOT EXISTS knowledge_fts USING fts5( doc_id UNINDEXED, title, content ); CREATE TABLE IF NOT EXISTS ai_tasks ( id INTEGER PRIMARY KEY, group_id INTEGER REFERENCES groups(id), type TEXT, status TEXT, progress TEXT, error TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS app_settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL DEFAULT '' ); """) await db.execute( """ UPDATE ai_tasks SET status='error', error=?, updated_at=CURRENT_TIMESTAMP WHERE type='summarize' AND status='running' AND datetime(updated_at) <= datetime('now', '-15 minutes') """, (STALE_SUMMARIZE_ERROR,), ) await db.execute( """ UPDATE topics SET status='error', updated_at=CURRENT_TIMESTAMP WHERE status='processing' AND datetime(updated_at) <= datetime('now', '-15 minutes') """ ) await db.commit() async with db.execute("PRAGMA table_info(topic_messages)") as cur: topic_message_cols = {row[1] for row in await cur.fetchall()} if "message_json" not in topic_message_cols: await db.execute("ALTER TABLE topic_messages ADD COLUMN message_json TEXT") await db.commit() log.info(f"[init_db] added topic_messages.message_json in {path}") async with db.execute("PRAGMA table_info(groups)") as cur: group_cols = {row[1] for row in await cur.fetchall()} if "analysis_prompt" not in group_cols: await db.execute("ALTER TABLE groups ADD COLUMN analysis_prompt TEXT DEFAULT ''") await db.commit() log.info(f"[init_db] added groups.analysis_prompt in {path}") async with db.execute("PRAGMA table_info(topics)") as cur: topic_cols = {row[1] for row in await cur.fetchall()} if "source" not in topic_cols: await db.execute("ALTER TABLE topics ADD COLUMN source TEXT DEFAULT 'manual'") await db.execute( """ UPDATE topics SET source = CASE WHEN EXISTS ( SELECT 1 FROM topic_messages tm WHERE tm.topic_id = topics.id AND tm.added_by = 'user' ) THEN 'manual' WHEN EXISTS ( SELECT 1 FROM topic_messages tm WHERE tm.topic_id = topics.id AND COALESCE(tm.added_by, 'ai') = 'ai' ) THEN 'ai' ELSE 'manual' END """ ) await db.commit() log.info(f"[init_db] added topics.source in {path}") async with db.execute("PRAGMA table_info(knowledge_docs)") as cur: knowledge_cols = {row[1] for row in await cur.fetchall()} if "curated_at" not in knowledge_cols: await db.execute("ALTER TABLE knowledge_docs ADD COLUMN curated_at DATETIME") await db.execute( """ UPDATE knowledge_docs SET curated_at = updated_at WHERE updated_at IS NOT NULL AND created_at IS NOT NULL AND updated_at > created_at """ ) await db.commit() log.info(f"[init_db] added knowledge_docs.curated_at in {path}") # 迁移 topics 表到 AUTOINCREMENT,防止 SQLite rowid 复用导致旧 knowledge_docs # 被新建话题"接"上(跨群串报告的根因)。每次启动检测一次,已迁移则跳过。 async with db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='sqlite_sequence'" ) as cur: has_seq_tbl = await cur.fetchone() is not None needs_migrate = True if has_seq_tbl: async with db.execute( "SELECT 1 FROM sqlite_sequence WHERE name='topics'" ) as cur: if await cur.fetchone(): needs_migrate = False if needs_migrate: await db.executescript(""" BEGIN; CREATE TABLE topics_new ( id INTEGER PRIMARY KEY AUTOINCREMENT, group_id INTEGER REFERENCES groups(id), title TEXT NOT NULL, source TEXT DEFAULT 'manual', status TEXT DEFAULT 'pending', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); INSERT INTO topics_new (id, group_id, title, source, status, created_at, updated_at) SELECT id, group_id, title, COALESCE(source, 'manual'), status, created_at, updated_at FROM topics; DROP TABLE topics; ALTER TABLE topics_new RENAME TO topics; COMMIT; """) await db.execute( "INSERT OR REPLACE INTO sqlite_sequence(name, seq) " "SELECT 'topics', COALESCE(MAX(id), 0) FROM topics" ) await db.commit() log.info(f"[init_db] migrated topics table to AUTOINCREMENT in {path}") # 孤儿数据清理:删除 topic_id 不存在于 topics 的 knowledge_docs 及其 FTS。 # 历史上删群时遗漏过这两张表,需要每次启动幂等修复。 await db.execute(""" DELETE FROM knowledge_fts WHERE doc_id IN ( SELECT id FROM knowledge_docs WHERE topic_id NOT IN (SELECT id FROM topics) ) """) await db.execute(""" DELETE FROM knowledge_docs WHERE topic_id NOT IN (SELECT id FROM topics) """) # 错绑数据清理:doc 创建时间早于其指向的 topic 创建时间,说明 doc 是历史残留、 # topic 是后建的(rowid 复用),doc 应清掉。合法 doc 必然在 topic 之后生成。 await db.execute(""" DELETE FROM knowledge_fts WHERE doc_id IN ( SELECT d.id FROM knowledge_docs d JOIN topics t ON t.id = d.topic_id WHERE d.created_at < t.created_at ) """) await db.execute(""" DELETE FROM knowledge_docs WHERE id IN ( SELECT d.id FROM knowledge_docs d JOIN topics t ON t.id = d.topic_id WHERE d.created_at < t.created_at ) """) await db.commit() _initialized_dbs.add(path)