feat(api): 添加万川平台模型配置获取和同步功能 - 新增 getWanchuanModelConfig 函数,按模型编码获取平台模型配置 - 新增 syncWanchuanModelToSettings 函数,从万川平台拉取模型配置并写入后端 AI 设置 - 支持按用途分多个模型编码(generic/vision/voice)分别同步配置 - 配置失败时跳过对应字段,不影响其他模型同步 feat(settings): 重构AI模型配置界面支持多模块分组 - 将AI配置按话题分析、报告生成、视觉、语音四个模块分组展示 - 每个模块独立配置接口地址、密钥和模型名称 - 添加从万川平台获取配置的按钮和同步功能 - 优化配置状态指示和错误提示信息 refactor(config): 扩展AI配置支持独立的语音视觉报告网关 - 新增 voice_base_url/voice_api_key 配置项 - 新增 vision_base_url/vision_api_key 配置项 - 新增 summary_base_url/summary_api_key 配置项 - 留空时回退到 ai_base_url/ai_api_key 兼容单网关场景 refactor(http): 统一使用共享HTTP客户端减少连接开销 - 替换各处 httpx.AsyncClient 为 shared_client - 在 lifespan 中正确关闭共享客户端资源 - 优化 get_current_wxid 和 health 检查中的HTTP请求 refactor(ai): 按用途缓存AI客户端支持不同网关配置 - 重构 get_openai_client 支持按(base_url, api_key)缓存 - 新增 get_client_for 函数按用途获取对应客户端 - 支持语音、视觉、报告等不同用途使用独立网关和密钥 ```
306 lines
12 KiB
Python
306 lines
12 KiB
Python
import aiosqlite
|
||
import asyncio
|
||
import logging
|
||
import time
|
||
from pathlib import Path
|
||
from config import settings
|
||
from services.http_client import shared_client
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
_data_db_dir = Path(settings.db_path).resolve().parent
|
||
_data_db_dir.mkdir(parents=True, exist_ok=True)
|
||
_current_db_path = str(Path(settings.db_path).resolve())
|
||
_initialized_dbs = set()
|
||
|
||
_resolved_wxid: str | None = None
|
||
_wxid_last_resolved: float = 0.0
|
||
_WXID_TTL = 60.0 # 60 秒后强制重新检测,确保账号切换能被感知
|
||
STALE_SUMMARIZE_ERROR = "AI 报告生成任务超过 15 分钟未完成,已自动标记为失败,可重新生成"
|
||
|
||
def _db_path_for_wxid(wxid: str) -> str:
|
||
if wxid and wxid != "default":
|
||
safe = "".join(c for c in wxid if c.isalnum() or c in ("_", "-"))
|
||
return str((_data_db_dir / f"knowledge_{safe}.db").resolve())
|
||
return str(Path(settings.db_path).resolve())
|
||
|
||
|
||
def reset_wxid_cache():
|
||
global _resolved_wxid, _wxid_last_resolved
|
||
_resolved_wxid = None
|
||
_wxid_last_resolved = 0.0
|
||
|
||
|
||
async def get_current_wxid(force: bool = False):
|
||
global _resolved_wxid, _wxid_last_resolved
|
||
now = time.time()
|
||
# 已有有效缓存且未超时,直接返回
|
||
if (
|
||
not force
|
||
and _resolved_wxid
|
||
and _resolved_wxid != "default"
|
||
and (now - _wxid_last_resolved) < _WXID_TTL
|
||
):
|
||
return _resolved_wxid
|
||
# 重新解析当前 wxid
|
||
base = settings.chatlog_base_url
|
||
client = shared_client()
|
||
try:
|
||
r = await client.get(f"{base}/api/v1/chatlog", params={"talker": "filehelper", "limit": 100, "time": "1970-01-01,2099-12-31", "format": "json"}, timeout=10.0)
|
||
if r.status_code == 200:
|
||
data = r.json()
|
||
for msg in data.get("items", []):
|
||
if msg.get("isSelf"):
|
||
_resolved_wxid = msg.get("sender")
|
||
_wxid_last_resolved = time.time()
|
||
return _resolved_wxid
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
r = await client.get(f"{base}/api/v1/chatroom", params={"limit": 10, "format": "json"}, timeout=10.0)
|
||
if r.status_code == 200:
|
||
rooms = r.json().get("items", [])
|
||
for room in rooms:
|
||
room_id = room.get("name")
|
||
r2 = await client.get(f"{base}/api/v1/chatlog", params={"talker": room_id, "limit": 50, "time": "1970-01-01,2099-12-31", "format": "json"}, timeout=10.0)
|
||
if r2.status_code == 200:
|
||
data2 = r2.json()
|
||
for msg in data2.get("items", []):
|
||
if msg.get("isSelf"):
|
||
_resolved_wxid = msg.get("sender")
|
||
_wxid_last_resolved = time.time()
|
||
return _resolved_wxid
|
||
except Exception:
|
||
pass
|
||
if force:
|
||
reset_wxid_cache()
|
||
return "default"
|
||
|
||
async def update_db_path(force: bool = False):
|
||
global _current_db_path
|
||
wxid = await get_current_wxid(force=force)
|
||
new_path = _db_path_for_wxid(wxid)
|
||
if new_path != _current_db_path:
|
||
log.info(f"Switching database to {new_path}")
|
||
_current_db_path = new_path
|
||
await init_db(new_path)
|
||
# 账号切换:清掉 chatlog 客户端的 contact 库路径与头像缓存,避免显示上一个账号的头像。
|
||
# 局部导入避免潜在的模块循环引用。
|
||
from services.chatlog_client import chatlog_client
|
||
chatlog_client.reset_account_cache()
|
||
return _current_db_path
|
||
|
||
def get_active_db_path():
|
||
return _current_db_path
|
||
|
||
async def get_db():
|
||
path = get_active_db_path()
|
||
if path not in _initialized_dbs:
|
||
await init_db(path)
|
||
async with aiosqlite.connect(path) as db:
|
||
db.row_factory = aiosqlite.Row
|
||
yield db
|
||
|
||
async def init_db(path=None):
|
||
if path is None:
|
||
path = get_active_db_path()
|
||
async with aiosqlite.connect(path) as db:
|
||
await db.executescript("""
|
||
CREATE TABLE IF NOT EXISTS groups (
|
||
id INTEGER PRIMARY KEY,
|
||
talker TEXT UNIQUE NOT NULL,
|
||
name TEXT,
|
||
analysis_prompt TEXT DEFAULT '',
|
||
cursor_seq INTEGER DEFAULT 0,
|
||
initialized INTEGER DEFAULT 0,
|
||
poll_interval INTEGER DEFAULT 300,
|
||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||
);
|
||
CREATE TABLE IF NOT EXISTS topics (
|
||
id INTEGER PRIMARY KEY,
|
||
group_id INTEGER REFERENCES groups(id),
|
||
title TEXT NOT NULL,
|
||
source TEXT DEFAULT 'manual',
|
||
status TEXT DEFAULT 'pending',
|
||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||
);
|
||
CREATE TABLE IF NOT EXISTS topic_messages (
|
||
topic_id INTEGER REFERENCES topics(id),
|
||
msg_seq INTEGER,
|
||
talker TEXT,
|
||
added_by TEXT DEFAULT 'ai',
|
||
message_json TEXT,
|
||
PRIMARY KEY (topic_id, msg_seq)
|
||
);
|
||
CREATE TABLE IF NOT EXISTS knowledge_docs (
|
||
id INTEGER PRIMARY KEY,
|
||
topic_id INTEGER UNIQUE REFERENCES topics(id),
|
||
content TEXT,
|
||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
curated_at DATETIME
|
||
);
|
||
CREATE VIRTUAL TABLE IF NOT EXISTS knowledge_fts USING fts5(
|
||
doc_id UNINDEXED,
|
||
title,
|
||
content
|
||
);
|
||
CREATE TABLE IF NOT EXISTS ai_tasks (
|
||
id INTEGER PRIMARY KEY,
|
||
group_id INTEGER REFERENCES groups(id),
|
||
type TEXT,
|
||
status TEXT,
|
||
progress TEXT,
|
||
error TEXT,
|
||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||
);
|
||
CREATE TABLE IF NOT EXISTS app_settings (
|
||
key TEXT PRIMARY KEY,
|
||
value TEXT NOT NULL DEFAULT ''
|
||
);
|
||
""")
|
||
await db.execute(
|
||
"""
|
||
UPDATE ai_tasks
|
||
SET status='error', error=?, updated_at=CURRENT_TIMESTAMP
|
||
WHERE type='summarize'
|
||
AND status='running'
|
||
AND datetime(updated_at) <= datetime('now', '-15 minutes')
|
||
""",
|
||
(STALE_SUMMARIZE_ERROR,),
|
||
)
|
||
await db.execute(
|
||
"""
|
||
UPDATE topics
|
||
SET status='error', updated_at=CURRENT_TIMESTAMP
|
||
WHERE status='processing'
|
||
AND datetime(updated_at) <= datetime('now', '-15 minutes')
|
||
"""
|
||
)
|
||
await db.commit()
|
||
|
||
async with db.execute("PRAGMA table_info(topic_messages)") as cur:
|
||
topic_message_cols = {row[1] for row in await cur.fetchall()}
|
||
if "message_json" not in topic_message_cols:
|
||
await db.execute("ALTER TABLE topic_messages ADD COLUMN message_json TEXT")
|
||
await db.commit()
|
||
log.info(f"[init_db] added topic_messages.message_json in {path}")
|
||
|
||
async with db.execute("PRAGMA table_info(groups)") as cur:
|
||
group_cols = {row[1] for row in await cur.fetchall()}
|
||
if "analysis_prompt" not in group_cols:
|
||
await db.execute("ALTER TABLE groups ADD COLUMN analysis_prompt TEXT DEFAULT ''")
|
||
await db.commit()
|
||
log.info(f"[init_db] added groups.analysis_prompt in {path}")
|
||
|
||
async with db.execute("PRAGMA table_info(topics)") as cur:
|
||
topic_cols = {row[1] for row in await cur.fetchall()}
|
||
if "source" not in topic_cols:
|
||
await db.execute("ALTER TABLE topics ADD COLUMN source TEXT DEFAULT 'manual'")
|
||
await db.execute(
|
||
"""
|
||
UPDATE topics
|
||
SET source = CASE
|
||
WHEN EXISTS (
|
||
SELECT 1 FROM topic_messages tm
|
||
WHERE tm.topic_id = topics.id AND tm.added_by = 'user'
|
||
) THEN 'manual'
|
||
WHEN EXISTS (
|
||
SELECT 1 FROM topic_messages tm
|
||
WHERE tm.topic_id = topics.id AND COALESCE(tm.added_by, 'ai') = 'ai'
|
||
) THEN 'ai'
|
||
ELSE 'manual'
|
||
END
|
||
"""
|
||
)
|
||
await db.commit()
|
||
log.info(f"[init_db] added topics.source in {path}")
|
||
|
||
async with db.execute("PRAGMA table_info(knowledge_docs)") as cur:
|
||
knowledge_cols = {row[1] for row in await cur.fetchall()}
|
||
if "curated_at" not in knowledge_cols:
|
||
await db.execute("ALTER TABLE knowledge_docs ADD COLUMN curated_at DATETIME")
|
||
await db.execute(
|
||
"""
|
||
UPDATE knowledge_docs
|
||
SET curated_at = updated_at
|
||
WHERE updated_at IS NOT NULL
|
||
AND created_at IS NOT NULL
|
||
AND updated_at > created_at
|
||
"""
|
||
)
|
||
await db.commit()
|
||
log.info(f"[init_db] added knowledge_docs.curated_at in {path}")
|
||
|
||
# 迁移 topics 表到 AUTOINCREMENT,防止 SQLite rowid 复用导致旧 knowledge_docs
|
||
# 被新建话题"接"上(跨群串报告的根因)。每次启动检测一次,已迁移则跳过。
|
||
async with db.execute(
|
||
"SELECT name FROM sqlite_master WHERE type='table' AND name='sqlite_sequence'"
|
||
) as cur:
|
||
has_seq_tbl = await cur.fetchone() is not None
|
||
needs_migrate = True
|
||
if has_seq_tbl:
|
||
async with db.execute(
|
||
"SELECT 1 FROM sqlite_sequence WHERE name='topics'"
|
||
) as cur:
|
||
if await cur.fetchone():
|
||
needs_migrate = False
|
||
if needs_migrate:
|
||
await db.executescript("""
|
||
BEGIN;
|
||
CREATE TABLE topics_new (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
group_id INTEGER REFERENCES groups(id),
|
||
title TEXT NOT NULL,
|
||
source TEXT DEFAULT 'manual',
|
||
status TEXT DEFAULT 'pending',
|
||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||
);
|
||
INSERT INTO topics_new (id, group_id, title, source, status, created_at, updated_at)
|
||
SELECT id, group_id, title, COALESCE(source, 'manual'), status, created_at, updated_at FROM topics;
|
||
DROP TABLE topics;
|
||
ALTER TABLE topics_new RENAME TO topics;
|
||
COMMIT;
|
||
""")
|
||
await db.execute(
|
||
"INSERT OR REPLACE INTO sqlite_sequence(name, seq) "
|
||
"SELECT 'topics', COALESCE(MAX(id), 0) FROM topics"
|
||
)
|
||
await db.commit()
|
||
log.info(f"[init_db] migrated topics table to AUTOINCREMENT in {path}")
|
||
|
||
# 孤儿数据清理:删除 topic_id 不存在于 topics 的 knowledge_docs 及其 FTS。
|
||
# 历史上删群时遗漏过这两张表,需要每次启动幂等修复。
|
||
await db.execute("""
|
||
DELETE FROM knowledge_fts WHERE doc_id IN (
|
||
SELECT id FROM knowledge_docs
|
||
WHERE topic_id NOT IN (SELECT id FROM topics)
|
||
)
|
||
""")
|
||
await db.execute("""
|
||
DELETE FROM knowledge_docs
|
||
WHERE topic_id NOT IN (SELECT id FROM topics)
|
||
""")
|
||
# 错绑数据清理:doc 创建时间早于其指向的 topic 创建时间,说明 doc 是历史残留、
|
||
# topic 是后建的(rowid 复用),doc 应清掉。合法 doc 必然在 topic 之后生成。
|
||
await db.execute("""
|
||
DELETE FROM knowledge_fts WHERE doc_id IN (
|
||
SELECT d.id FROM knowledge_docs d
|
||
JOIN topics t ON t.id = d.topic_id
|
||
WHERE d.created_at < t.created_at
|
||
)
|
||
""")
|
||
await db.execute("""
|
||
DELETE FROM knowledge_docs WHERE id IN (
|
||
SELECT d.id FROM knowledge_docs d
|
||
JOIN topics t ON t.id = d.topic_id
|
||
WHERE d.created_at < t.created_at
|
||
)
|
||
""")
|
||
await db.commit()
|
||
_initialized_dbs.add(path)
|