Files
get_wechat/chatlog_fastAPI/services/media_parser.py

143 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import logging
import httpx
from fastapi import HTTPException
from services.ai_client import get_openai_client
from services.media_resolver import resolve_media
from services.runtime_settings import get_ai_settings
log = logging.getLogger(__name__)
async def _get_ai_client():
return await get_openai_client()
async def parse_media(kind: str, key: str) -> dict:
"""
Parse one chatlog media object into text.
kind: voice, image, or video.
key: chatlog media key.
"""
if kind not in {"voice", "image", "video"}:
raise HTTPException(400, "不支持的媒体类型")
if not key:
raise HTTPException(400, "媒体 key 不能为空")
ai = await get_ai_settings()
if not ai.get("ai_api_key"):
raise HTTPException(503, "AI 服务未配置,请在设置页填写 AI API Key")
if kind == "voice" and not ai.get("voice_model"):
raise HTTPException(503, "语音模型未配置,请在设置页填写语音模型名称,例如 paraformer-v2")
if kind in ("image", "video") and not ai.get("vision_model"):
raise HTTPException(503, "视觉模型未配置,请在设置页填写视觉模型名称,例如 qwen-vl-plus")
media = await resolve_media(kind, key)
if kind == "voice":
return {"text": await _parse_voice(media.bytes, media.content_type)}
return {"text": await _parse_visual(kind, media.bytes, media.content_type)}
async def _parse_voice(media_bytes: bytes, content_type: str) -> str:
b64_audio = base64.b64encode(media_bytes).decode()
audio_ct = content_type.lower()
if "silk" in audio_ct or "x-silk" in audio_ct:
audio_mime = "audio/silk"
elif "amr" in audio_ct:
audio_mime = "audio/amr"
elif "ogg" in audio_ct or "opus" in audio_ct:
audio_mime = "audio/ogg"
elif "wav" in audio_ct:
audio_mime = "audio/wav"
else:
audio_mime = "audio/mpeg"
data_uri = f"data:{audio_mime};base64,{b64_audio}"
_, ai = await _get_ai_client()
asr_headers = {
"Authorization": f"Bearer {ai['ai_api_key']}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=60) as http:
submit = await http.post(
"https://dashscope.aliyuncs.com/api/v1/services/audio/asr/transcription",
headers={**asr_headers, "X-DashScope-Async": "enable"},
json={
"model": ai["voice_model"],
"input": {"file_urls": [data_uri]},
"parameters": {"language_hints": ["zh", "en"]},
},
timeout=30,
)
submit_data = submit.json()
if submit.status_code not in (200, 201):
raise HTTPException(500, f"提交识别任务失败: {submit_data.get('message', submit_data)}")
task_id = submit_data.get("output", {}).get("task_id")
if not task_id:
raise HTTPException(500, f"未获取到 task_id: {submit_data}")
for _ in range(30):
import asyncio
await asyncio.sleep(1)
poll = await http.get(
f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}",
headers=asr_headers,
timeout=10,
)
poll_data = poll.json()
status = poll_data.get("output", {}).get("task_status", "")
if status == "SUCCEEDED":
results = poll_data.get("output", {}).get("results", [])
log.info("[media_parser] ASR SUCCEEDED results: %s", results)
if not results:
return "(识别结果为空)"
trans_url = results[0].get("transcription_url", "")
if trans_url:
trans_resp = await http.get(trans_url, timeout=10)
trans_data = trans_resp.json()
log.info("[media_parser] transcription_url content: %s", str(trans_data)[:500])
transcripts = trans_data.get("transcripts", [])
text = transcripts[0].get("text", "") if transcripts else ""
else:
text = results[0].get("transcription", "")
return text or "(识别结果为空)"
if status in ("FAILED", "CANCELLED"):
raise HTTPException(500, f"识别任务失败: {poll_data.get('output', {}).get('message', status)}")
raise HTTPException(500, "语音识别超时30秒")
async def _parse_visual(kind: str, media_bytes: bytes, content_type: str) -> str:
b64 = base64.b64encode(media_bytes).decode()
ct = content_type.lower()
if "png" in ct:
mime = "image/png"
elif "webp" in ct:
mime = "image/webp"
else:
mime = "image/jpeg"
data_url = f"data:{mime};base64,{b64}"
prompt = "请用中文简洁描述这张图片的内容。" if kind == "image" else "请用中文简洁描述这个视频截图的内容。"
client, ai = await _get_ai_client()
resp_ai = await client.chat.completions.create(
model=ai["vision_model"],
messages=[
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": data_url}},
{"type": "text", "text": prompt},
],
}
],
max_tokens=300,
)
return resp_ai.choices[0].message.content or ""