1306 lines
61 KiB
Python
1306 lines
61 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from time import perf_counter
|
||
from uuid import uuid4
|
||
|
||
from app.plugins.base import PluginRegistry
|
||
from app.schemas.chat import ChatRequest, ChatResponse, FillSlotsRequest
|
||
from app.schemas.debug import IntentCandidate, MatcherStageDebug, RoutingDebug
|
||
from app.schemas.workflow import MissingSlot, Workflow, WorkflowStep
|
||
from app.services.dialog_act import DialogActEngine
|
||
from app.services.dialog_rules import DialogRuleEngine
|
||
from app.services.intent_registry import IntentRegistry
|
||
from app.services.knowledge_llm import DashScopeKnowledgeLLM
|
||
from app.services.planner import PlanningResult, WorkflowPlanner
|
||
from app.services.response_policy import ResponsePolicy
|
||
from app.services.rewrite_engine import ContextRewriteEngine, RewriteResult
|
||
from app.services.router import Router
|
||
from app.services.session_store import InMemorySessionStore, SessionState
|
||
from app.services.social import SocialResponder, SocialRouter
|
||
|
||
|
||
class AgentService:
|
||
_MUSIC_GENRE_HINTS = ("轻音乐", "摇滚", "古典", "民谣", "爵士", "流行", "儿歌")
|
||
|
||
def __init__(
|
||
self,
|
||
intent_registry: IntentRegistry,
|
||
router: Router,
|
||
plugins: PluginRegistry,
|
||
session_store: InMemorySessionStore | None = None,
|
||
rewrite_engine: ContextRewriteEngine | None = None,
|
||
response_policy: ResponsePolicy | None = None,
|
||
dialog_rules: DialogRuleEngine | None = None,
|
||
dialog_act_engine: DialogActEngine | None = None,
|
||
planner: WorkflowPlanner | None = None,
|
||
social_router: SocialRouter | None = None,
|
||
social_responder: SocialResponder | None = None,
|
||
knowledge_llm: DashScopeKnowledgeLLM | None = None,
|
||
) -> None:
|
||
self.session_store = session_store or InMemorySessionStore()
|
||
self.intent_registry = intent_registry
|
||
self.router = router
|
||
self.plugins = plugins
|
||
self.rewrite_engine = rewrite_engine or ContextRewriteEngine()
|
||
self.response_policy = response_policy or ResponsePolicy()
|
||
self.dialog_rules = dialog_rules or DialogRuleEngine()
|
||
self.dialog_act_engine = dialog_act_engine or DialogActEngine()
|
||
self.planner = planner
|
||
self.social_router = social_router or SocialRouter()
|
||
self.social_responder = social_responder
|
||
self.knowledge_llm = knowledge_llm
|
||
|
||
def handle_chat(self, request: ChatRequest) -> ChatResponse:
|
||
started_at = perf_counter()
|
||
breakdown: dict[str, float] = {}
|
||
|
||
timer_started_at = perf_counter()
|
||
session = self.session_store.get_or_create(
|
||
session_id=request.session_id,
|
||
user_id=request.user_id,
|
||
channel=request.channel,
|
||
)
|
||
self._mark_timing(breakdown, "session_get_or_create_ms", timer_started_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
self._update_dialog_act(session, request.input_text)
|
||
self._mark_timing(breakdown, "dialog_act_ms", timer_started_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
stop_response = self._handle_stop_request(session, request.input_text)
|
||
self._mark_timing(breakdown, "stop_check_ms", timer_started_at)
|
||
if stop_response is not None:
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, stop_response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(stop_response, started_at, breakdown, first_response_at)
|
||
|
||
if session.status == "waiting_confirmation":
|
||
confirmation = self._parse_confirmation_decision(request.input_text)
|
||
if confirmation is not None:
|
||
timer_started_at = perf_counter()
|
||
response = self._handle_confirmation(session, request.input_text)
|
||
self._mark_timing(breakdown, "confirmation_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
continued_response = self._continue_active_task(session, request.input_text)
|
||
self._mark_timing(breakdown, "active_task_ms", timer_started_at)
|
||
if continued_response is not None:
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, continued_response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(continued_response, started_at, breakdown, first_response_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
social_response = self._handle_social_turn(session, request.input_text)
|
||
self._mark_timing(breakdown, "social_route_ms", timer_started_at)
|
||
if social_response is not None:
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, social_response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(social_response, started_at, breakdown, first_response_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
rewrite_result = self.rewrite_engine.rewrite(request.input_text, session)
|
||
rewrite_elapsed_ms = round((perf_counter() - timer_started_at) * 1000, 3)
|
||
breakdown["rewrite_ms"] = rewrite_elapsed_ms
|
||
|
||
timer_started_at = perf_counter()
|
||
route_result = self.router.route(rewrite_result.rewritten_text)
|
||
self._mark_timing(breakdown, "route_ms", timer_started_at)
|
||
self._attach_rewrite_debug(route_result.debug, rewrite_result, rewrite_elapsed_ms)
|
||
|
||
timer_started_at = perf_counter()
|
||
# ── BERT 信号不稳时,知识库优先检索(在 Planner 之前)──────────────────
|
||
# decision=route_to_cloud 表示 BERT 低置信,需发给 cloud planner
|
||
# 但如果本地 MD 知识库关键词已命中,直接走 knowledge_llm,跳过 cloud planner
|
||
if route_result.debug.decision == "route_to_cloud" and self.knowledge_llm is not None:
|
||
quick_hits = self.knowledge_llm._store.search(rewrite_result.rewritten_text, top_k=1)
|
||
if quick_hits and quick_hits[0].score >= 3.0: # 至少命中一个 title 或多个 body 词
|
||
early_knowledge = self._try_knowledge_llm(
|
||
session, request.input_text, route_result.debug, rewrite_result
|
||
)
|
||
if early_knowledge is not None:
|
||
self._mark_timing(breakdown, "knowledge_llm_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
self._record_turn(session, request.input_text, early_knowledge.reply_text, rewrite_result)
|
||
self.session_store.save(session)
|
||
return self._finalize_response(early_knowledge, started_at, breakdown, first_response_at)
|
||
planning_result = self._plan_if_needed(session, rewrite_result.rewritten_text, route_result.debug)
|
||
self._mark_timing(breakdown, "planner_ms", timer_started_at)
|
||
if planning_result is not None and planning_result.accepted and planning_result.steps:
|
||
timer_started_at = perf_counter()
|
||
response = self._start_planned_workflow(session, planning_result, route_result.debug, request.input_text, rewrite_result)
|
||
self._mark_timing(breakdown, "response_build_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
decision_response = self._handle_route_decision(session, route_result.debug, request.input_text, rewrite_result)
|
||
self._mark_timing(breakdown, "decision_response_ms", timer_started_at)
|
||
if decision_response is not None:
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(decision_response, started_at, breakdown, first_response_at)
|
||
intent = route_result.intent
|
||
if intent is None:
|
||
timer_started_at = perf_counter()
|
||
# ── BERT 未命中:尝试 LLM + knowledge_search function call 兆底 ──
|
||
knowledge_response = self._try_knowledge_llm(
|
||
session, request.input_text, route_result.debug, rewrite_result
|
||
)
|
||
if knowledge_response is not None:
|
||
self._mark_timing(breakdown, "knowledge_llm_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
self._record_turn(session, request.input_text, knowledge_response.reply_text, rewrite_result)
|
||
self.session_store.save(session)
|
||
return self._finalize_response(knowledge_response, started_at, breakdown, first_response_at)
|
||
response = self._fallback_response(session.session_id, route_result.debug)
|
||
self._mark_timing(breakdown, "response_build_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, response.reply_text, rewrite_result)
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
if session.current_intent != intent.intent_id:
|
||
session.pending_slots = []
|
||
session.slots = {}
|
||
session.workflow = None
|
||
|
||
session.current_intent = intent.intent_id
|
||
session.status = "understanding"
|
||
timer_started_at = perf_counter()
|
||
extracted_slots = self.router.extract_slots(rewrite_result.rewritten_text, intent)
|
||
self._mark_timing(breakdown, "slot_extract_ms", timer_started_at)
|
||
route_result.debug.extracted_slots = extracted_slots.copy()
|
||
session.routing_debug = route_result.debug.model_dump()
|
||
session.slots.update(extracted_slots)
|
||
self._update_context_memory(session, intent.intent_id, extracted_slots)
|
||
|
||
timer_started_at = perf_counter()
|
||
response = self._build_response_from_session(session)
|
||
self._mark_timing(breakdown, "response_build_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, response.reply_text, rewrite_result)
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
def _handle_route_decision(
|
||
self,
|
||
session: SessionState,
|
||
routing_debug: RoutingDebug,
|
||
user_text: str,
|
||
rewrite_result: RewriteResult,
|
||
) -> ChatResponse | None:
|
||
if routing_debug.decision == "execute":
|
||
return None
|
||
if routing_debug.decision == "clarify":
|
||
response = ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="clarify",
|
||
reply_text=self.response_policy.clarify(self._top_candidate_ids(routing_debug)),
|
||
intent=routing_debug.selected_intent,
|
||
domain=self._domain_for_intent(routing_debug.selected_intent),
|
||
decision=routing_debug.decision,
|
||
decision_reason=routing_debug.decision_reason,
|
||
status="clarify",
|
||
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
self._record_turn(session, user_text, response.reply_text, rewrite_result)
|
||
return response
|
||
if routing_debug.decision == "reject":
|
||
response = ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="reject",
|
||
reply_text=self.response_policy.reject(),
|
||
decision=routing_debug.decision,
|
||
decision_reason=routing_debug.decision_reason,
|
||
status="rejected",
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
self._record_turn(session, user_text, response.reply_text, rewrite_result)
|
||
return response
|
||
if routing_debug.decision == "route_to_cloud":
|
||
candidate_ids = self._top_candidate_ids(routing_debug)
|
||
planner_stage = next((stage for stage in reversed(routing_debug.stages) if stage.stage == "planner"), None)
|
||
planner_reason = planner_stage.reason if planner_stage is not None else routing_debug.decision_reason
|
||
if self._planner_indicates_out_of_scope(planner_stage):
|
||
response = ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="reject",
|
||
reply_text=self.response_policy.reject(),
|
||
intent=routing_debug.selected_intent,
|
||
domain=self._domain_for_intent(routing_debug.selected_intent),
|
||
decision="reject",
|
||
decision_reason=planner_reason,
|
||
|
||
status="rejected",
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
elif candidate_ids:
|
||
response = ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="clarify",
|
||
reply_text=self.response_policy.clarify(candidate_ids),
|
||
intent=routing_debug.selected_intent,
|
||
domain=self._domain_for_intent(routing_debug.selected_intent),
|
||
decision=routing_debug.decision,
|
||
decision_reason=planner_reason,
|
||
|
||
status="route_to_cloud",
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
else:
|
||
response = ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="fallback",
|
||
reply_text=self.response_policy.fallback(),
|
||
intent=routing_debug.selected_intent,
|
||
domain=self._domain_for_intent(routing_debug.selected_intent),
|
||
decision=routing_debug.decision,
|
||
decision_reason=planner_reason,
|
||
|
||
status="route_to_cloud",
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
self._record_turn(session, user_text, response.reply_text, rewrite_result)
|
||
return response
|
||
return None
|
||
|
||
def handle_fill_slots(self, request: FillSlotsRequest) -> ChatResponse:
|
||
started_at = perf_counter()
|
||
breakdown: dict[str, float] = {}
|
||
|
||
timer_started_at = perf_counter()
|
||
session = self.session_store.get(request.session_id)
|
||
self._mark_timing(breakdown, "session_get_ms", timer_started_at)
|
||
if session is None or session.current_intent is None:
|
||
timer_started_at = perf_counter()
|
||
response = ChatResponse(
|
||
session_id=request.session_id,
|
||
reply_type="fallback",
|
||
reply_text=self.response_policy.fallback(),
|
||
status="fallback",
|
||
routing_debug=None,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
self._mark_timing(breakdown, "response_build_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
if session is not None:
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
self._update_dialog_act(session, request.input_text)
|
||
self._mark_timing(breakdown, "dialog_act_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
stop_response = self._handle_stop_request(session, request.input_text)
|
||
self._mark_timing(breakdown, "stop_check_ms", timer_started_at)
|
||
if stop_response is not None:
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, stop_response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(stop_response, started_at, breakdown, first_response_at)
|
||
|
||
if session.status == "waiting_confirmation":
|
||
if self._parse_confirmation_decision(request.input_text) is not None:
|
||
timer_started_at = perf_counter()
|
||
response = self._handle_confirmation(session, request.input_text)
|
||
self._mark_timing(breakdown, "confirmation_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
continued_response = self._continue_active_task(session, request.input_text)
|
||
self._mark_timing(breakdown, "active_task_ms", timer_started_at)
|
||
if continued_response is not None:
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, continued_response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(continued_response, started_at, breakdown, first_response_at)
|
||
|
||
timer_started_at = perf_counter()
|
||
social_response = self._handle_social_turn(session, request.input_text)
|
||
self._mark_timing(breakdown, "social_route_ms", timer_started_at)
|
||
if social_response is not None:
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, social_response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(social_response, started_at, breakdown, first_response_at)
|
||
|
||
if session.status == "waiting_confirmation":
|
||
timer_started_at = perf_counter()
|
||
response = self._handle_confirmation(session, request.input_text)
|
||
self._mark_timing(breakdown, "confirmation_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
existing_workflow = self._workflow_from_session(session)
|
||
if existing_workflow is not None and existing_workflow.workflow_type != "single":
|
||
intent = self.intent_registry.get(session.current_intent)
|
||
timer_started_at = perf_counter()
|
||
extracted_slots = self._extract_continuation_slots(session, intent, request.input_text)
|
||
self._mark_timing(breakdown, "slot_extract_ms", timer_started_at)
|
||
session.slots.update(extracted_slots)
|
||
self._update_context_memory(session, intent.intent_id, extracted_slots)
|
||
timer_started_at = perf_counter()
|
||
response = self._continue_planned_workflow(
|
||
session,
|
||
existing_workflow,
|
||
self._routing_debug_from_session(session),
|
||
)
|
||
self._mark_timing(breakdown, "response_build_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
intent = self.intent_registry.get(session.current_intent)
|
||
timer_started_at = perf_counter()
|
||
extracted_slots = self._extract_continuation_slots(session, intent, request.input_text)
|
||
self._mark_timing(breakdown, "slot_extract_ms", timer_started_at)
|
||
session.slots.update(extracted_slots)
|
||
self._update_context_memory(session, intent.intent_id, extracted_slots)
|
||
if session.routing_debug is not None:
|
||
routing_debug = RoutingDebug.model_validate(session.routing_debug)
|
||
routing_debug.extracted_slots = session.slots.copy()
|
||
session.routing_debug = routing_debug.model_dump()
|
||
timer_started_at = perf_counter()
|
||
response = self._build_response_from_session(session)
|
||
self._mark_timing(breakdown, "response_build_ms", timer_started_at)
|
||
first_response_at = perf_counter()
|
||
timer_started_at = perf_counter()
|
||
self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text))
|
||
self._mark_timing(breakdown, "record_turn_ms", timer_started_at)
|
||
timer_started_at = perf_counter()
|
||
self.session_store.save(session)
|
||
self._mark_timing(breakdown, "session_save_ms", timer_started_at)
|
||
return self._finalize_response(response, started_at, breakdown, first_response_at)
|
||
|
||
def _build_response_from_session(self, session: SessionState) -> ChatResponse:
|
||
assert session.current_intent is not None
|
||
intent = self.intent_registry.get(session.current_intent)
|
||
session.slots = self._normalize_slots_for_intent(intent.intent_id, session.slots)
|
||
pending_slots = self._pending_slots_for_intent(intent, session.slots)
|
||
|
||
if pending_slots:
|
||
session.pending_slots = pending_slots
|
||
session.status = "waiting_slot"
|
||
self.session_store.save(session)
|
||
first_slot = pending_slots[0]
|
||
workflow = self._build_workflow(session, pending_slots)
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="ask_slot",
|
||
reply_text=self.response_policy.ask_for_slot(
|
||
intent,
|
||
first_slot,
|
||
self._default_ask_template(intent, first_slot),
|
||
),
|
||
intent=session.current_intent,
|
||
domain=self._domain_for_intent(session.current_intent),
|
||
status=session.status,
|
||
pending_slots=pending_slots,
|
||
filled_slots=session.slots,
|
||
|
||
workflow=workflow,
|
||
routing_debug=self._routing_debug_from_session(session),
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
plugin_result = self.plugins.execute(intent.plugin_id, session.slots)
|
||
session.pending_slots = []
|
||
session.status = "completed"
|
||
workflow = self._build_workflow(session, [])
|
||
workflow.status = "completed"
|
||
self.session_store.save(session)
|
||
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="workflow_result",
|
||
reply_text=self.response_policy.workflow_result(intent, plugin_result),
|
||
intent=session.current_intent,
|
||
domain=self._domain_for_intent(session.current_intent),
|
||
status=session.status,
|
||
pending_slots=[],
|
||
filled_slots=session.slots,
|
||
|
||
workflow=workflow,
|
||
routing_debug=self._routing_debug_from_session(session),
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
def _build_workflow(self, session: SessionState, pending_slots: list[str]) -> Workflow:
|
||
assert session.current_intent is not None
|
||
intent = self.intent_registry.get(session.current_intent)
|
||
missing = [
|
||
MissingSlot(
|
||
slot_name=slot,
|
||
ask_template=self._default_ask_template(intent, slot),
|
||
priority=index + 1,
|
||
)
|
||
for index, slot in enumerate(pending_slots)
|
||
]
|
||
step = WorkflowStep(
|
||
step=1,
|
||
step_id=f"step_{session.current_intent}_1",
|
||
intent_id=session.current_intent,
|
||
plugin_id=intent.plugin_id,
|
||
action=intent.plugin_id.split(".")[-1],
|
||
status="pending" if pending_slots else "completed",
|
||
slots=session.slots.copy(),
|
||
)
|
||
workflow = Workflow(
|
||
workflow_id=f"wf_{session.session_id}",
|
||
workflow_type="single",
|
||
domain=intent.domain,
|
||
intent_id=session.current_intent,
|
||
status="waiting_slot" if pending_slots else "ready",
|
||
risk_level=intent.risk_level,
|
||
slots=session.slots.copy(),
|
||
missing_slots=missing,
|
||
steps=[step],
|
||
meta={
|
||
"source": "rule_router",
|
||
"routing_debug": session.routing_debug or {},
|
||
},
|
||
)
|
||
session.workflow = workflow.model_dump()
|
||
return workflow
|
||
|
||
def _fallback_response(self, session_id: str, routing_debug: RoutingDebug | None = None) -> ChatResponse:
|
||
return ChatResponse(
|
||
session_id=session_id,
|
||
reply_type="fallback",
|
||
reply_text=self.response_policy.fallback(),
|
||
decision=routing_debug.decision if routing_debug is not None else None,
|
||
decision_reason=routing_debug.decision_reason if routing_debug is not None else None,
|
||
status="fallback",
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
def _handle_stop_request(self, session: SessionState, text: str) -> ChatResponse | None:
|
||
if not self._is_stop_request(text) or not self._has_active_task(session):
|
||
return None
|
||
self._reset_active_task(session)
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="text",
|
||
reply_text=self.response_policy.task_stopped(),
|
||
status="stopped",
|
||
pending_slots=[],
|
||
filled_slots={},
|
||
workflow=None,
|
||
routing_debug=None,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
def _handle_social_turn(self, session: SessionState, text: str) -> ChatResponse | None:
|
||
social_route = self.social_router.route(text, session)
|
||
if social_route.category == "none":
|
||
return None
|
||
if self.social_responder is None:
|
||
reply_text = self.response_policy.open_social_fallback()
|
||
return self._social_response(
|
||
session=session,
|
||
reply_text=reply_text,
|
||
decision="open_social",
|
||
decision_reason="open social matched but no responder configured",
|
||
)
|
||
social_result = self.social_responder.reply(text, session)
|
||
reply_text = social_result.text.strip() or self.response_policy.open_social_fallback()
|
||
return self._social_response(
|
||
session=session,
|
||
reply_text=reply_text,
|
||
decision="open_social",
|
||
decision_reason=social_route.reason,
|
||
)
|
||
|
||
def _social_response(
|
||
self,
|
||
session: SessionState,
|
||
reply_text: str,
|
||
decision: str,
|
||
decision_reason: str,
|
||
) -> ChatResponse:
|
||
session.context_memory["last_dialog_mode"] = decision
|
||
pending_hint = self.response_policy.pending_task_hint(
|
||
session.status,
|
||
session.pending_slots,
|
||
session.current_intent,
|
||
)
|
||
final_reply = self.response_policy.with_pending_hint(reply_text, pending_hint)
|
||
workflow = self._workflow_from_session(session)
|
||
status = session.status if session.status != "idle" else "social"
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="text",
|
||
reply_text=final_reply,
|
||
intent=session.current_intent,
|
||
domain=self._domain_for_intent(session.current_intent),
|
||
decision=decision,
|
||
decision_reason=decision_reason,
|
||
|
||
status=status,
|
||
pending_slots=list(session.pending_slots),
|
||
filled_slots=session.slots.copy(),
|
||
workflow=workflow,
|
||
routing_debug=self._routing_debug_from_session(session),
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
def _routing_debug_from_session(self, session: SessionState) -> RoutingDebug | None:
|
||
if session.routing_debug is None:
|
||
return None
|
||
return RoutingDebug.model_validate(session.routing_debug)
|
||
|
||
def _trace_id(self) -> str:
|
||
return uuid4().hex
|
||
|
||
def _domain_for_intent(self, intent_id: str | None) -> str | None:
|
||
"""Return the domain for a given intent_id, or None if unknown."""
|
||
if intent_id is None:
|
||
return None
|
||
try:
|
||
return self.intent_registry.get(intent_id).domain
|
||
except KeyError:
|
||
return None
|
||
|
||
def _planner_indicates_out_of_scope(self, planner_stage: MatcherStageDebug | None) -> bool:
|
||
if planner_stage is None or planner_stage.accepted:
|
||
return False
|
||
reason = str(planner_stage.reason or "").strip().lower()
|
||
if not reason:
|
||
return False
|
||
out_of_scope_signals = (
|
||
"no matching intent",
|
||
"there is no matching intent",
|
||
"not in the catalog",
|
||
"intent catalog only contains",
|
||
"provided intent catalog only contains",
|
||
"out of scope",
|
||
"third-party app action",
|
||
"outside known local capabilities",
|
||
"没有匹配意图",
|
||
"不在意图目录",
|
||
"不在目录中",
|
||
"超出能力范围",
|
||
"能力之外",
|
||
)
|
||
return any(signal in reason for signal in out_of_scope_signals)
|
||
|
||
def _mark_timing(self, breakdown: dict[str, float], name: str, started_at: float) -> None:
|
||
breakdown[name] = round((perf_counter() - started_at) * 1000, 3)
|
||
|
||
def _finalize_response(
|
||
self,
|
||
response: ChatResponse,
|
||
started_at: float,
|
||
breakdown: dict[str, float],
|
||
first_response_at: float | None = None,
|
||
) -> ChatResponse:
|
||
first_ready_at = first_response_at or perf_counter()
|
||
response.first_response_latency_ms = round((first_ready_at - started_at) * 1000, 3)
|
||
response.total_latency_ms = round((perf_counter() - started_at) * 1000, 3)
|
||
response.processing_breakdown = dict(breakdown)
|
||
if response.routing_debug is not None:
|
||
if response.routing_debug.total_match_latency_ms is not None:
|
||
response.processing_breakdown.setdefault(
|
||
"match_pipeline_ms",
|
||
response.routing_debug.total_match_latency_ms,
|
||
)
|
||
return response
|
||
|
||
def _top_candidate_ids(self, routing_debug: RoutingDebug, limit: int = 3) -> list[str]:
|
||
if not routing_debug.stages:
|
||
return []
|
||
fusion_stage = next((stage for stage in reversed(routing_debug.stages) if stage.stage == "fusion"), None)
|
||
if fusion_stage is None:
|
||
return []
|
||
return [candidate.intent_id for candidate in fusion_stage.candidates[:limit]]
|
||
|
||
def _plan_if_needed(
|
||
self,
|
||
session: SessionState,
|
||
text: str,
|
||
routing_debug: RoutingDebug,
|
||
) -> PlanningResult | None:
|
||
if self.planner is None:
|
||
return None
|
||
if not self._should_use_planner(text, routing_debug):
|
||
return None
|
||
planner_started_at = perf_counter()
|
||
result = self.planner.plan(
|
||
text,
|
||
self.intent_registry.list(),
|
||
context={
|
||
"current_intent": session.current_intent,
|
||
"slots": session.slots,
|
||
"context_memory": session.context_memory,
|
||
},
|
||
)
|
||
planner_elapsed_ms = round((perf_counter() - planner_started_at) * 1000, 3)
|
||
self._attach_planner_debug(routing_debug, result, planner_elapsed_ms)
|
||
return result
|
||
|
||
def _should_use_planner(self, text: str, routing_debug: RoutingDebug) -> bool:
|
||
fusion_stage = routing_debug.stages[-1] if routing_debug.stages else None
|
||
has_complex_pattern = any(token in text for token in ("然后", "并且", "同时", "如果", "若", "先", "后", "顺便", "以及"))
|
||
if has_complex_pattern:
|
||
return True
|
||
if self._looks_like_parallel_request(text, fusion_stage):
|
||
return True
|
||
if text.count(",") >= 1 or text.count(",") >= 1 or text.count(";") >= 1:
|
||
return True
|
||
if fusion_stage is not None and fusion_stage.stage == "fusion" and not fusion_stage.accepted:
|
||
return True
|
||
return False
|
||
|
||
def _attach_planner_debug(
|
||
self,
|
||
routing_debug: RoutingDebug,
|
||
planning_result: PlanningResult,
|
||
elapsed_ms: float | None = None,
|
||
) -> None:
|
||
metadata = {
|
||
"workflow_type": planning_result.workflow_type,
|
||
"raw_response": planning_result.raw_response,
|
||
}
|
||
metadata.update(planning_result.metadata)
|
||
routing_debug.stages.append(
|
||
MatcherStageDebug(
|
||
stage="planner",
|
||
accepted=planning_result.accepted,
|
||
selected_intent=planning_result.steps[0].intent_id if planning_result.steps else None,
|
||
score=float(len(planning_result.steps)),
|
||
elapsed_ms=elapsed_ms,
|
||
reason=planning_result.reason,
|
||
model_name=planning_result.model_name,
|
||
backend=planning_result.backend,
|
||
error_message=planning_result.error_message,
|
||
metadata=metadata,
|
||
candidates=[
|
||
IntentCandidate(
|
||
intent_id=step.intent_id,
|
||
score=1.0,
|
||
reason=step.reason,
|
||
model_name=planning_result.model_name,
|
||
metadata=planning_result.metadata.get("normalized_steps", [{}])[index]
|
||
if index < len(planning_result.metadata.get("normalized_steps", []))
|
||
else {"slots": step.slots},
|
||
)
|
||
for index, step in enumerate(planning_result.steps[:5])
|
||
],
|
||
)
|
||
)
|
||
|
||
def _start_planned_workflow(
|
||
self,
|
||
session: SessionState,
|
||
planning_result: PlanningResult,
|
||
routing_debug: RoutingDebug,
|
||
user_text: str,
|
||
rewrite_result: RewriteResult,
|
||
) -> ChatResponse:
|
||
workflow = self._build_planned_workflow(session, planning_result)
|
||
session.workflow = workflow.model_dump()
|
||
session.routing_debug = routing_debug.model_dump()
|
||
response = self._continue_planned_workflow(session, workflow, routing_debug)
|
||
self._record_turn(session, user_text, response.reply_text, rewrite_result)
|
||
return response
|
||
|
||
def _build_planned_workflow(self, session: SessionState, planning_result: PlanningResult) -> Workflow:
|
||
steps: list[WorkflowStep] = []
|
||
first_intent = planning_result.steps[0].intent_id
|
||
domain = self.intent_registry.get(first_intent).domain
|
||
risk_level = self.intent_registry.get(first_intent).risk_level
|
||
step_id_map = {index: f"step_{session.session_id}_{index}" for index in range(1, len(planning_result.steps) + 1)}
|
||
for index, planned_step in enumerate(planning_result.steps, start=1):
|
||
intent = self.intent_registry.get(planned_step.intent_id)
|
||
steps.append(
|
||
WorkflowStep(
|
||
step=index,
|
||
step_id=step_id_map[index],
|
||
intent_id=planned_step.intent_id,
|
||
plugin_id=intent.plugin_id,
|
||
action=intent.plugin_id.split(".")[-1],
|
||
status="pending",
|
||
depends_on=[
|
||
step_id_map[dep_index]
|
||
for dep_index in planned_step.depends_on
|
||
if dep_index in step_id_map
|
||
],
|
||
slots=dict(planned_step.slots),
|
||
condition=dict(planned_step.condition),
|
||
requires_confirmation=planned_step.requires_confirmation or self._requires_confirmation(intent),
|
||
)
|
||
)
|
||
return Workflow(
|
||
workflow_id=f"wf_{session.session_id}",
|
||
workflow_type=planning_result.workflow_type if planning_result.workflow_type in {"single", "sequence", "conditional", "parallel"} else "sequence",
|
||
domain=domain,
|
||
intent_id=first_intent,
|
||
status="ready",
|
||
risk_level=risk_level,
|
||
slots=session.slots.copy(),
|
||
missing_slots=[],
|
||
steps=steps,
|
||
meta={
|
||
"source": planning_result.backend,
|
||
"planner_model": planning_result.model_name,
|
||
"planner_reason": planning_result.reason,
|
||
"planner_debug": planning_result.metadata,
|
||
"step_results": {},
|
||
"confirmed_steps": [],
|
||
},
|
||
)
|
||
|
||
def _continue_planned_workflow(
|
||
self,
|
||
session: SessionState,
|
||
workflow: Workflow,
|
||
routing_debug: RoutingDebug | None,
|
||
carry_messages: list[str] | None = None,
|
||
) -> ChatResponse:
|
||
step_messages: list[str] = list(carry_messages or [])
|
||
step_results = workflow.meta.setdefault("step_results", {})
|
||
confirmed_steps = workflow.meta.setdefault("confirmed_steps", [])
|
||
for step in workflow.steps:
|
||
if step.status in {"completed", "skipped"}:
|
||
continue
|
||
if step.depends_on and not all(self._is_step_completed(workflow, dependency) for dependency in step.depends_on):
|
||
continue
|
||
intent = self.intent_registry.get(step.intent_id)
|
||
merged_slots = session.slots.copy()
|
||
merged_slots.update(step.slots)
|
||
merged_slots = self._normalize_slots_for_intent(intent.intent_id, merged_slots)
|
||
step.slots = merged_slots.copy()
|
||
session.current_intent = intent.intent_id
|
||
missing_slots = self._pending_slots_for_intent(intent, merged_slots)
|
||
if missing_slots:
|
||
workflow.status = "waiting_slot"
|
||
session.pending_slots = missing_slots
|
||
session.status = "waiting_slot"
|
||
workflow.missing_slots = [
|
||
MissingSlot(
|
||
slot_name=slot,
|
||
ask_template=self._default_ask_template(intent, slot),
|
||
priority=index + 1,
|
||
)
|
||
for index, slot in enumerate(missing_slots)
|
||
]
|
||
session.workflow = workflow.model_dump()
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="ask_slot",
|
||
reply_text=self.response_policy.ask_for_slot(
|
||
intent,
|
||
missing_slots[0],
|
||
self._default_ask_template(intent, missing_slots[0]),
|
||
),
|
||
intent=intent.intent_id,
|
||
domain=intent.domain,
|
||
status=session.status,
|
||
pending_slots=missing_slots,
|
||
|
||
filled_slots=merged_slots,
|
||
workflow=workflow,
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
condition_state = self._evaluate_step_condition(step, workflow)
|
||
if condition_state is False:
|
||
step.status = "skipped"
|
||
reason = step.condition.get("description") if step.condition else None
|
||
step_messages.append(self.response_policy.step_skipped(intent, reason))
|
||
continue
|
||
|
||
if step.requires_confirmation and step.step_id not in confirmed_steps:
|
||
step.status = "waiting_confirmation"
|
||
workflow.status = "waiting_confirmation"
|
||
session.pending_slots = ["confirmation"]
|
||
session.status = "waiting_confirmation"
|
||
workflow.meta["pending_confirmation"] = {
|
||
"step_id": step.step_id,
|
||
"intent_id": intent.intent_id,
|
||
"detail": step.condition.get("description") if step.condition else None,
|
||
}
|
||
session.workflow = workflow.model_dump()
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="ask_confirmation",
|
||
reply_text=self.response_policy.ask_for_confirmation(
|
||
intent,
|
||
str(workflow.meta["pending_confirmation"].get("detail") or "").strip() or None,
|
||
),
|
||
intent=intent.intent_id,
|
||
domain=intent.domain,
|
||
status=session.status,
|
||
pending_slots=["confirmation"],
|
||
|
||
filled_slots=merged_slots,
|
||
workflow=workflow,
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
plugin_result = self.plugins.execute(intent.plugin_id, merged_slots)
|
||
step.status = "completed"
|
||
step_results[step.step_id] = plugin_result
|
||
session.slots.update(merged_slots)
|
||
self._update_context_memory(session, intent.intent_id, merged_slots)
|
||
step_messages.append(self.response_policy.workflow_result(intent, plugin_result))
|
||
|
||
workflow.status = "completed"
|
||
workflow.missing_slots = []
|
||
session.pending_slots = []
|
||
session.status = "completed"
|
||
session.current_intent = workflow.intent_id
|
||
session.workflow = workflow.model_dump()
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="workflow_result",
|
||
reply_text=self.response_policy.workflow_summary(step_messages),
|
||
intent=workflow.intent_id,
|
||
domain=self._domain_for_intent(workflow.intent_id),
|
||
status=session.status,
|
||
pending_slots=[],
|
||
|
||
filled_slots=session.slots,
|
||
workflow=workflow,
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
def _requires_confirmation(self, intent) -> bool:
|
||
return self.dialog_rules.requires_confirmation(intent.intent_id, intent.risk_level)
|
||
|
||
def _is_step_completed(self, workflow: Workflow, step_id: str) -> bool:
|
||
dependency = next((item for item in workflow.steps if item.step_id == step_id), None)
|
||
return dependency is not None and dependency.status == "completed"
|
||
|
||
def _evaluate_step_condition(self, step: WorkflowStep, workflow: Workflow) -> bool | None:
|
||
if not step.condition:
|
||
return None
|
||
source_step_id = self._resolve_condition_source_step(step, workflow)
|
||
if source_step_id is None:
|
||
return None
|
||
step_results = workflow.meta.get("step_results", {})
|
||
source_result = step_results.get(source_step_id, {})
|
||
if not isinstance(source_result, dict):
|
||
return None
|
||
field_name = str(step.condition.get("field", "")).strip()
|
||
operator = str(step.condition.get("operator", "equals")).strip() or "equals"
|
||
expected = step.condition.get("value")
|
||
actual = source_result.get("data", {}).get(field_name)
|
||
if operator == "equals":
|
||
return actual == expected
|
||
if operator == "not_equals":
|
||
return actual != expected
|
||
if operator == "in" and isinstance(expected, list):
|
||
return actual in expected
|
||
return None
|
||
|
||
def _resolve_condition_source_step(self, step: WorkflowStep, workflow: Workflow) -> str | None:
|
||
source_step = step.condition.get("source_step")
|
||
try:
|
||
if source_step is not None:
|
||
source_index = int(source_step)
|
||
matched = next((item for item in workflow.steps if item.step == source_index), None)
|
||
if matched is not None:
|
||
return matched.step_id
|
||
except (TypeError, ValueError):
|
||
return None
|
||
if step.depends_on:
|
||
return step.depends_on[0]
|
||
return None
|
||
|
||
def _handle_confirmation(self, session: SessionState, text: str) -> ChatResponse:
|
||
workflow = self._workflow_from_session(session)
|
||
routing_debug = self._routing_debug_from_session(session)
|
||
if workflow is None:
|
||
return self._fallback_response(session.session_id, routing_debug)
|
||
|
||
pending = workflow.meta.get("pending_confirmation", {})
|
||
step_id = str(pending.get("step_id", "")).strip()
|
||
if not step_id:
|
||
return self._fallback_response(session.session_id, routing_debug)
|
||
|
||
decision = self._parse_confirmation_decision(text)
|
||
step = next((item for item in workflow.steps if item.step_id == step_id), None)
|
||
if step is None:
|
||
return self._fallback_response(session.session_id, routing_debug)
|
||
intent = self.intent_registry.get(step.intent_id)
|
||
|
||
if decision is None:
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="ask_confirmation",
|
||
reply_text=self.response_policy.confirm_retry(),
|
||
intent=intent.intent_id,
|
||
domain=intent.domain,
|
||
status=session.status,
|
||
pending_slots=["confirmation"],
|
||
|
||
filled_slots=session.slots,
|
||
workflow=workflow,
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|
||
|
||
workflow.meta.pop("pending_confirmation", None)
|
||
if decision is False:
|
||
step.status = "skipped"
|
||
session.pending_slots = []
|
||
session.status = "running"
|
||
skipped_reason = self.response_policy.confirm_cancelled()
|
||
step_messages = [skipped_reason]
|
||
response = self._continue_planned_workflow(session, workflow, routing_debug, carry_messages=step_messages)
|
||
return response
|
||
|
||
confirmed_steps = workflow.meta.setdefault("confirmed_steps", [])
|
||
if step_id not in confirmed_steps:
|
||
confirmed_steps.append(step_id)
|
||
session.pending_slots = []
|
||
session.status = "running"
|
||
return self._continue_planned_workflow(session, workflow, routing_debug)
|
||
|
||
def _workflow_from_session(self, session: SessionState) -> Workflow | None:
|
||
if session.workflow is None:
|
||
return None
|
||
return Workflow.model_validate(session.workflow)
|
||
|
||
def _attach_rewrite_debug(
|
||
self,
|
||
routing_debug: RoutingDebug,
|
||
rewrite_result: RewriteResult,
|
||
elapsed_ms: float | None = None,
|
||
) -> None:
|
||
routing_debug.stages.insert(
|
||
0,
|
||
MatcherStageDebug(
|
||
stage="rewrite",
|
||
accepted=rewrite_result.applied,
|
||
score=1.0 if rewrite_result.applied else 0.0,
|
||
elapsed_ms=elapsed_ms,
|
||
reason=rewrite_result.reason or "no rewrite needed",
|
||
model_name="context-rewrite",
|
||
metadata={
|
||
"original_text": rewrite_result.original_text,
|
||
"rewritten_text": rewrite_result.rewritten_text,
|
||
**rewrite_result.metadata,
|
||
},
|
||
),
|
||
)
|
||
|
||
def _update_context_memory(
|
||
self,
|
||
session: SessionState,
|
||
intent_id: str,
|
||
extracted_slots: dict[str, object],
|
||
) -> None:
|
||
session.context_memory["last_intent"] = intent_id
|
||
if "temperature" in extracted_slots:
|
||
session.context_memory["last_temperature"] = extracted_slots["temperature"]
|
||
if "destination" in extracted_slots:
|
||
session.context_memory["last_destination"] = extracted_slots["destination"]
|
||
if "song" in extracted_slots:
|
||
session.context_memory["last_song"] = extracted_slots["song"]
|
||
if "genre" in extracted_slots:
|
||
session.context_memory["last_genre"] = extracted_slots["genre"]
|
||
|
||
def _normalize_slots_for_intent(self, intent_id: str, slots: dict[str, object]) -> dict[str, object]:
|
||
normalized = dict(slots)
|
||
if intent_id == "cabin_set_ac":
|
||
temperature = self._normalize_temperature_value(normalized.get("temperature"))
|
||
if temperature is not None:
|
||
normalized["temperature"] = temperature
|
||
else:
|
||
normalized.pop("temperature", None)
|
||
if intent_id == "cabin_play_music":
|
||
media_query = str(normalized.get("media_query") or "").strip()
|
||
if media_query and "song" not in normalized and "genre" not in normalized:
|
||
if any(genre in media_query for genre in self._MUSIC_GENRE_HINTS):
|
||
matched = next((genre for genre in self._MUSIC_GENRE_HINTS if genre in media_query), None)
|
||
if matched:
|
||
normalized["genre"] = matched
|
||
else:
|
||
normalized["song"] = media_query
|
||
normalized.pop("media_query", None)
|
||
return normalized
|
||
|
||
def _continue_active_task(self, session: SessionState, text: str) -> ChatResponse | None:
|
||
if session.current_intent is None:
|
||
return None
|
||
if session.status not in {"waiting_slot", "waiting_confirmation"}:
|
||
return None
|
||
if session.status == "waiting_confirmation":
|
||
return None
|
||
intent = self.intent_registry.get(session.current_intent)
|
||
extracted_slots = self._extract_continuation_slots(session, intent, text)
|
||
if not extracted_slots:
|
||
return None
|
||
session.slots.update(extracted_slots)
|
||
self._update_context_memory(session, intent.intent_id, extracted_slots)
|
||
if session.routing_debug is not None:
|
||
routing_debug = RoutingDebug.model_validate(session.routing_debug)
|
||
routing_debug.extracted_slots = session.slots.copy()
|
||
session.routing_debug = routing_debug.model_dump()
|
||
existing_workflow = self._workflow_from_session(session)
|
||
if existing_workflow is not None and existing_workflow.workflow_type != "single":
|
||
return self._continue_planned_workflow(
|
||
session,
|
||
existing_workflow,
|
||
self._routing_debug_from_session(session),
|
||
)
|
||
return self._build_response_from_session(session)
|
||
|
||
def _extract_continuation_slots(
|
||
self,
|
||
session: SessionState,
|
||
intent,
|
||
text: str,
|
||
) -> dict[str, object]:
|
||
extracted = self.router.extract_slots(text, intent)
|
||
if extracted:
|
||
return extracted
|
||
normalized = text.strip()
|
||
if not normalized:
|
||
return {}
|
||
candidate_texts = self._continuation_candidate_texts(intent.intent_id, normalized, session.pending_slots)
|
||
merged: dict[str, object] = {}
|
||
for candidate_text in candidate_texts:
|
||
candidate_slots = self.router.extract_slots(candidate_text, intent)
|
||
if candidate_slots:
|
||
merged.update(candidate_slots)
|
||
break
|
||
if merged:
|
||
return merged
|
||
if intent.intent_id == "cabin_play_music" and self._requires_music_query(session.pending_slots):
|
||
compact = normalized.strip(",。,.!?!? ")
|
||
if not compact:
|
||
return {}
|
||
if self.dialog_act_engine.detect(compact) in {"thanks", "greeting", "goodbye"}:
|
||
return {}
|
||
matched_genre = next((genre for genre in self._MUSIC_GENRE_HINTS if genre in compact), None)
|
||
if matched_genre is not None:
|
||
return {"genre": matched_genre}
|
||
if len(compact) <= 20:
|
||
return {"song": compact}
|
||
return {}
|
||
|
||
def _continuation_candidate_texts(
|
||
self,
|
||
intent_id: str,
|
||
text: str,
|
||
pending_slots: list[str],
|
||
) -> list[str]:
|
||
pending = list(pending_slots)
|
||
if intent_id == "cabin_play_music" and self._requires_music_query(pending):
|
||
return [f"播放{text}", f"来一首{text}", f"来点{text}"]
|
||
if intent_id == "cabin_nav_to" and "destination" in pending:
|
||
return [f"导航去{text}", f"带我去{text}"]
|
||
if intent_id == "cabin_set_ac" and "temperature" in pending:
|
||
return [f"把空调调到{text}", f"温度设成{text}"]
|
||
if intent_id in {"cs_query_order", "cs_query_logistics", "cs_cancel_order"} and "order_id" in pending:
|
||
prefix = {
|
||
"cs_query_order": "查一下订单",
|
||
"cs_query_logistics": "查一下物流",
|
||
"cs_cancel_order": "取消订单",
|
||
}.get(intent_id, "")
|
||
return [f"{prefix}{text}"] if prefix else [text]
|
||
return []
|
||
|
||
def _pending_slots_for_intent(self, intent, slots: dict[str, object]) -> list[str]:
|
||
pending_slots = [slot for slot in intent.required_slots if slot not in slots]
|
||
if intent.intent_id == "cabin_play_music" and not self._has_music_query(slots):
|
||
pending_slots.append("media_query")
|
||
deduped: list[str] = []
|
||
for slot in pending_slots:
|
||
if slot not in deduped:
|
||
deduped.append(slot)
|
||
return deduped
|
||
|
||
def _default_ask_template(self, intent, slot_name: str) -> str:
|
||
if slot_name in intent.ask_templates:
|
||
return str(intent.ask_templates[slot_name])
|
||
if slot_name == "media_query":
|
||
return "想听什么风格或者具体的歌名?"
|
||
return "请补充一个关键信息。"
|
||
|
||
def _has_music_query(self, slots: dict[str, object]) -> bool:
|
||
return bool(str(slots.get("song") or "").strip() or str(slots.get("genre") or "").strip())
|
||
|
||
def _requires_music_query(self, pending_slots: list[str]) -> bool:
|
||
return "media_query" in pending_slots or not pending_slots
|
||
|
||
def _looks_like_parallel_request(self, text: str, fusion_stage: MatcherStageDebug | None) -> bool:
|
||
_ = fusion_stage
|
||
compact = re.sub(r"\s+", "", text)
|
||
return bool(re.search(r"(和|跟|以及|外加).+", compact) and len(compact) >= 5)
|
||
|
||
def _normalize_temperature_value(self, raw_value: object) -> int | None:
|
||
if isinstance(raw_value, bool):
|
||
return None
|
||
if isinstance(raw_value, (int, float)):
|
||
value = int(raw_value)
|
||
elif isinstance(raw_value, str):
|
||
match = re.search(r"-?\d+", raw_value)
|
||
if not match:
|
||
return None
|
||
value = int(match.group(0))
|
||
else:
|
||
return None
|
||
return max(16, min(30, value))
|
||
|
||
def _record_turn(
|
||
self,
|
||
session: SessionState,
|
||
user_text: str,
|
||
agent_text: str,
|
||
rewrite_result: RewriteResult,
|
||
) -> None:
|
||
session.last_user_text = user_text
|
||
session.last_agent_text = agent_text
|
||
session.context_memory["last_raw_user_text"] = user_text
|
||
session.context_memory["last_rewritten_text"] = rewrite_result.rewritten_text
|
||
|
||
def _update_dialog_act(self, session: SessionState, text: str) -> None:
|
||
session.context_memory["last_dialog_act"] = self.dialog_act_engine.detect(text)
|
||
|
||
def _has_active_task(self, session: SessionState) -> bool:
|
||
if session.status in {"waiting_slot", "waiting_confirmation", "running", "understanding"}:
|
||
return True
|
||
workflow = self._workflow_from_session(session)
|
||
if workflow is not None and workflow.status not in {"completed", "failed"}:
|
||
return True
|
||
return False
|
||
|
||
def _is_stop_request(self, text: str) -> bool:
|
||
return self.dialog_rules.is_stop_request(text)
|
||
|
||
def _reset_active_task(self, session: SessionState) -> None:
|
||
session.status = "idle"
|
||
session.current_intent = None
|
||
session.pending_slots = []
|
||
session.slots = {}
|
||
session.workflow = None
|
||
session.routing_debug = None
|
||
|
||
def _parse_confirmation_decision(self, text: str) -> bool | None:
|
||
return self.dialog_rules.parse_confirmation_decision(text)
|
||
|
||
def _try_knowledge_llm(
|
||
self,
|
||
session: SessionState,
|
||
user_text: str,
|
||
routing_debug: RoutingDebug,
|
||
rewrite_result: RewriteResult,
|
||
) -> ChatResponse | None:
|
||
"""BERT 未命中时尝试 LLM + knowledge_search function call。
|
||
有结果返回 ChatResponse(含 knowledge_content),无结果返回 None。
|
||
"""
|
||
if self.knowledge_llm is None:
|
||
return None
|
||
|
||
result = self.knowledge_llm.reply(user_text)
|
||
|
||
# LLM 调用失败或本地兜底但没有文档 → 退回原始 fallback
|
||
if result.backend == "local-fallback" and result.doc_id is None:
|
||
return None
|
||
|
||
return ChatResponse(
|
||
session_id=session.session_id,
|
||
reply_type="text",
|
||
reply_text=result.reply_text,
|
||
intent=None,
|
||
domain="knowledge",
|
||
decision="knowledge_llm",
|
||
decision_reason=(
|
||
f"BERT 未命中,LLM 通过 knowledge_search 命中文档:{result.doc_id}"
|
||
if result.doc_id
|
||
else "BERT 未命中,LLM 自主回答"
|
||
),
|
||
status="completed",
|
||
knowledge_doc_id=result.doc_id,
|
||
knowledge_doc_title=result.doc_title,
|
||
knowledge_content=result.doc_content,
|
||
routing_debug=routing_debug,
|
||
trace_id=self._trace_id(),
|
||
)
|