from __future__ import annotations import re from time import perf_counter from uuid import uuid4 from app.plugins.base import PluginRegistry from app.schemas.chat import ChatRequest, ChatResponse, FillSlotsRequest from app.schemas.debug import IntentCandidate, MatcherStageDebug, RoutingDebug from app.schemas.workflow import MissingSlot, Workflow, WorkflowStep from app.services.dialog_act import DialogActEngine from app.services.dialog_rules import DialogRuleEngine from app.services.intent_registry import IntentRegistry from app.services.knowledge_llm import DashScopeKnowledgeLLM from app.services.planner import PlanningResult, WorkflowPlanner from app.services.response_policy import ResponsePolicy from app.services.rewrite_engine import ContextRewriteEngine, RewriteResult from app.services.router import Router from app.services.session_store import InMemorySessionStore, SessionState from app.services.social import SocialResponder, SocialRouter class AgentService: _MUSIC_GENRE_HINTS = ("轻音乐", "摇滚", "古典", "民谣", "爵士", "流行", "儿歌") def __init__( self, intent_registry: IntentRegistry, router: Router, plugins: PluginRegistry, session_store: InMemorySessionStore | None = None, rewrite_engine: ContextRewriteEngine | None = None, response_policy: ResponsePolicy | None = None, dialog_rules: DialogRuleEngine | None = None, dialog_act_engine: DialogActEngine | None = None, planner: WorkflowPlanner | None = None, social_router: SocialRouter | None = None, social_responder: SocialResponder | None = None, knowledge_llm: DashScopeKnowledgeLLM | None = None, ) -> None: self.session_store = session_store or InMemorySessionStore() self.intent_registry = intent_registry self.router = router self.plugins = plugins self.rewrite_engine = rewrite_engine or ContextRewriteEngine() self.response_policy = response_policy or ResponsePolicy() self.dialog_rules = dialog_rules or DialogRuleEngine() self.dialog_act_engine = dialog_act_engine or DialogActEngine() self.planner = planner self.social_router = social_router or SocialRouter() self.social_responder = social_responder self.knowledge_llm = knowledge_llm def handle_chat(self, request: ChatRequest) -> ChatResponse: started_at = perf_counter() breakdown: dict[str, float] = {} timer_started_at = perf_counter() session = self.session_store.get_or_create( session_id=request.session_id, user_id=request.user_id, channel=request.channel, ) self._mark_timing(breakdown, "session_get_or_create_ms", timer_started_at) timer_started_at = perf_counter() self._update_dialog_act(session, request.input_text) self._mark_timing(breakdown, "dialog_act_ms", timer_started_at) timer_started_at = perf_counter() stop_response = self._handle_stop_request(session, request.input_text) self._mark_timing(breakdown, "stop_check_ms", timer_started_at) if stop_response is not None: first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, stop_response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(stop_response, started_at, breakdown, first_response_at) if session.status == "waiting_confirmation": confirmation = self._parse_confirmation_decision(request.input_text) if confirmation is not None: timer_started_at = perf_counter() response = self._handle_confirmation(session, request.input_text) self._mark_timing(breakdown, "confirmation_ms", timer_started_at) first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) timer_started_at = perf_counter() continued_response = self._continue_active_task(session, request.input_text) self._mark_timing(breakdown, "active_task_ms", timer_started_at) if continued_response is not None: first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, continued_response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(continued_response, started_at, breakdown, first_response_at) timer_started_at = perf_counter() social_response = self._handle_social_turn(session, request.input_text) self._mark_timing(breakdown, "social_route_ms", timer_started_at) if social_response is not None: first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, social_response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(social_response, started_at, breakdown, first_response_at) timer_started_at = perf_counter() rewrite_result = self.rewrite_engine.rewrite(request.input_text, session) rewrite_elapsed_ms = round((perf_counter() - timer_started_at) * 1000, 3) breakdown["rewrite_ms"] = rewrite_elapsed_ms timer_started_at = perf_counter() route_result = self.router.route(rewrite_result.rewritten_text) self._mark_timing(breakdown, "route_ms", timer_started_at) self._attach_rewrite_debug(route_result.debug, rewrite_result, rewrite_elapsed_ms) timer_started_at = perf_counter() # ── BERT 信号不稳时,知识库优先检索(在 Planner 之前)────────────────── # decision=route_to_cloud 表示 BERT 低置信,需发给 cloud planner # 但如果本地 MD 知识库关键词已命中,直接走 knowledge_llm,跳过 cloud planner if route_result.debug.decision == "route_to_cloud" and self.knowledge_llm is not None: quick_hits = self.knowledge_llm._store.search(rewrite_result.rewritten_text, top_k=1) if quick_hits and quick_hits[0].score >= 3.0: # 至少命中一个 title 或多个 body 词 early_knowledge = self._try_knowledge_llm( session, request.input_text, route_result.debug, rewrite_result ) if early_knowledge is not None: self._mark_timing(breakdown, "knowledge_llm_ms", timer_started_at) first_response_at = perf_counter() self._record_turn(session, request.input_text, early_knowledge.reply_text, rewrite_result) self.session_store.save(session) return self._finalize_response(early_knowledge, started_at, breakdown, first_response_at) planning_result = self._plan_if_needed(session, rewrite_result.rewritten_text, route_result.debug) self._mark_timing(breakdown, "planner_ms", timer_started_at) if planning_result is not None and planning_result.accepted and planning_result.steps: timer_started_at = perf_counter() response = self._start_planned_workflow(session, planning_result, route_result.debug, request.input_text, rewrite_result) self._mark_timing(breakdown, "response_build_ms", timer_started_at) first_response_at = perf_counter() timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) timer_started_at = perf_counter() decision_response = self._handle_route_decision(session, route_result.debug, request.input_text, rewrite_result) self._mark_timing(breakdown, "decision_response_ms", timer_started_at) if decision_response is not None: first_response_at = perf_counter() timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(decision_response, started_at, breakdown, first_response_at) intent = route_result.intent if intent is None: timer_started_at = perf_counter() # ── BERT 未命中:尝试 LLM + knowledge_search function call 兆底 ── knowledge_response = self._try_knowledge_llm( session, request.input_text, route_result.debug, rewrite_result ) if knowledge_response is not None: self._mark_timing(breakdown, "knowledge_llm_ms", timer_started_at) first_response_at = perf_counter() self._record_turn(session, request.input_text, knowledge_response.reply_text, rewrite_result) self.session_store.save(session) return self._finalize_response(knowledge_response, started_at, breakdown, first_response_at) response = self._fallback_response(session.session_id, route_result.debug) self._mark_timing(breakdown, "response_build_ms", timer_started_at) first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, response.reply_text, rewrite_result) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) if session.current_intent != intent.intent_id: session.pending_slots = [] session.slots = {} session.workflow = None session.current_intent = intent.intent_id session.status = "understanding" timer_started_at = perf_counter() extracted_slots = self.router.extract_slots(rewrite_result.rewritten_text, intent) self._mark_timing(breakdown, "slot_extract_ms", timer_started_at) route_result.debug.extracted_slots = extracted_slots.copy() session.routing_debug = route_result.debug.model_dump() session.slots.update(extracted_slots) self._update_context_memory(session, intent.intent_id, extracted_slots) timer_started_at = perf_counter() response = self._build_response_from_session(session) self._mark_timing(breakdown, "response_build_ms", timer_started_at) first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, response.reply_text, rewrite_result) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) def _handle_route_decision( self, session: SessionState, routing_debug: RoutingDebug, user_text: str, rewrite_result: RewriteResult, ) -> ChatResponse | None: if routing_debug.decision == "execute": return None if routing_debug.decision == "clarify": response = ChatResponse( session_id=session.session_id, reply_type="clarify", reply_text=self.response_policy.clarify(self._top_candidate_ids(routing_debug)), intent=routing_debug.selected_intent, domain=self._domain_for_intent(routing_debug.selected_intent), decision=routing_debug.decision, decision_reason=routing_debug.decision_reason, status="clarify", routing_debug=routing_debug, trace_id=self._trace_id(), ) self._record_turn(session, user_text, response.reply_text, rewrite_result) return response if routing_debug.decision == "reject": response = ChatResponse( session_id=session.session_id, reply_type="reject", reply_text=self.response_policy.reject(), decision=routing_debug.decision, decision_reason=routing_debug.decision_reason, status="rejected", routing_debug=routing_debug, trace_id=self._trace_id(), ) self._record_turn(session, user_text, response.reply_text, rewrite_result) return response if routing_debug.decision == "route_to_cloud": candidate_ids = self._top_candidate_ids(routing_debug) planner_stage = next((stage for stage in reversed(routing_debug.stages) if stage.stage == "planner"), None) planner_reason = planner_stage.reason if planner_stage is not None else routing_debug.decision_reason if self._planner_indicates_out_of_scope(planner_stage): response = ChatResponse( session_id=session.session_id, reply_type="reject", reply_text=self.response_policy.reject(), intent=routing_debug.selected_intent, domain=self._domain_for_intent(routing_debug.selected_intent), decision="reject", decision_reason=planner_reason, status="rejected", routing_debug=routing_debug, trace_id=self._trace_id(), ) elif candidate_ids: response = ChatResponse( session_id=session.session_id, reply_type="clarify", reply_text=self.response_policy.clarify(candidate_ids), intent=routing_debug.selected_intent, domain=self._domain_for_intent(routing_debug.selected_intent), decision=routing_debug.decision, decision_reason=planner_reason, status="route_to_cloud", routing_debug=routing_debug, trace_id=self._trace_id(), ) else: response = ChatResponse( session_id=session.session_id, reply_type="fallback", reply_text=self.response_policy.fallback(), intent=routing_debug.selected_intent, domain=self._domain_for_intent(routing_debug.selected_intent), decision=routing_debug.decision, decision_reason=planner_reason, status="route_to_cloud", routing_debug=routing_debug, trace_id=self._trace_id(), ) self._record_turn(session, user_text, response.reply_text, rewrite_result) return response return None def handle_fill_slots(self, request: FillSlotsRequest) -> ChatResponse: started_at = perf_counter() breakdown: dict[str, float] = {} timer_started_at = perf_counter() session = self.session_store.get(request.session_id) self._mark_timing(breakdown, "session_get_ms", timer_started_at) if session is None or session.current_intent is None: timer_started_at = perf_counter() response = ChatResponse( session_id=request.session_id, reply_type="fallback", reply_text=self.response_policy.fallback(), status="fallback", routing_debug=None, trace_id=self._trace_id(), ) self._mark_timing(breakdown, "response_build_ms", timer_started_at) first_response_at = perf_counter() if session is not None: timer_started_at = perf_counter() self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) timer_started_at = perf_counter() self._update_dialog_act(session, request.input_text) self._mark_timing(breakdown, "dialog_act_ms", timer_started_at) timer_started_at = perf_counter() stop_response = self._handle_stop_request(session, request.input_text) self._mark_timing(breakdown, "stop_check_ms", timer_started_at) if stop_response is not None: first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, stop_response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(stop_response, started_at, breakdown, first_response_at) if session.status == "waiting_confirmation": if self._parse_confirmation_decision(request.input_text) is not None: timer_started_at = perf_counter() response = self._handle_confirmation(session, request.input_text) self._mark_timing(breakdown, "confirmation_ms", timer_started_at) first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) timer_started_at = perf_counter() continued_response = self._continue_active_task(session, request.input_text) self._mark_timing(breakdown, "active_task_ms", timer_started_at) if continued_response is not None: first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, continued_response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(continued_response, started_at, breakdown, first_response_at) timer_started_at = perf_counter() social_response = self._handle_social_turn(session, request.input_text) self._mark_timing(breakdown, "social_route_ms", timer_started_at) if social_response is not None: first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, social_response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(social_response, started_at, breakdown, first_response_at) if session.status == "waiting_confirmation": timer_started_at = perf_counter() response = self._handle_confirmation(session, request.input_text) self._mark_timing(breakdown, "confirmation_ms", timer_started_at) first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) existing_workflow = self._workflow_from_session(session) if existing_workflow is not None and existing_workflow.workflow_type != "single": intent = self.intent_registry.get(session.current_intent) timer_started_at = perf_counter() extracted_slots = self._extract_continuation_slots(session, intent, request.input_text) self._mark_timing(breakdown, "slot_extract_ms", timer_started_at) session.slots.update(extracted_slots) self._update_context_memory(session, intent.intent_id, extracted_slots) timer_started_at = perf_counter() response = self._continue_planned_workflow( session, existing_workflow, self._routing_debug_from_session(session), ) self._mark_timing(breakdown, "response_build_ms", timer_started_at) first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) intent = self.intent_registry.get(session.current_intent) timer_started_at = perf_counter() extracted_slots = self._extract_continuation_slots(session, intent, request.input_text) self._mark_timing(breakdown, "slot_extract_ms", timer_started_at) session.slots.update(extracted_slots) self._update_context_memory(session, intent.intent_id, extracted_slots) if session.routing_debug is not None: routing_debug = RoutingDebug.model_validate(session.routing_debug) routing_debug.extracted_slots = session.slots.copy() session.routing_debug = routing_debug.model_dump() timer_started_at = perf_counter() response = self._build_response_from_session(session) self._mark_timing(breakdown, "response_build_ms", timer_started_at) first_response_at = perf_counter() timer_started_at = perf_counter() self._record_turn(session, request.input_text, response.reply_text, RewriteResult(request.input_text, request.input_text)) self._mark_timing(breakdown, "record_turn_ms", timer_started_at) timer_started_at = perf_counter() self.session_store.save(session) self._mark_timing(breakdown, "session_save_ms", timer_started_at) return self._finalize_response(response, started_at, breakdown, first_response_at) def _build_response_from_session(self, session: SessionState) -> ChatResponse: assert session.current_intent is not None intent = self.intent_registry.get(session.current_intent) session.slots = self._normalize_slots_for_intent(intent.intent_id, session.slots) pending_slots = self._pending_slots_for_intent(intent, session.slots) if pending_slots: session.pending_slots = pending_slots session.status = "waiting_slot" self.session_store.save(session) first_slot = pending_slots[0] workflow = self._build_workflow(session, pending_slots) return ChatResponse( session_id=session.session_id, reply_type="ask_slot", reply_text=self.response_policy.ask_for_slot( intent, first_slot, self._default_ask_template(intent, first_slot), ), intent=session.current_intent, domain=self._domain_for_intent(session.current_intent), status=session.status, pending_slots=pending_slots, filled_slots=session.slots, workflow=workflow, routing_debug=self._routing_debug_from_session(session), trace_id=self._trace_id(), ) plugin_result = self.plugins.execute(intent.plugin_id, session.slots) session.pending_slots = [] session.status = "completed" workflow = self._build_workflow(session, []) workflow.status = "completed" self.session_store.save(session) return ChatResponse( session_id=session.session_id, reply_type="workflow_result", reply_text=self.response_policy.workflow_result(intent, plugin_result), intent=session.current_intent, domain=self._domain_for_intent(session.current_intent), status=session.status, pending_slots=[], filled_slots=session.slots, workflow=workflow, routing_debug=self._routing_debug_from_session(session), trace_id=self._trace_id(), ) def _build_workflow(self, session: SessionState, pending_slots: list[str]) -> Workflow: assert session.current_intent is not None intent = self.intent_registry.get(session.current_intent) missing = [ MissingSlot( slot_name=slot, ask_template=self._default_ask_template(intent, slot), priority=index + 1, ) for index, slot in enumerate(pending_slots) ] step = WorkflowStep( step=1, step_id=f"step_{session.current_intent}_1", intent_id=session.current_intent, plugin_id=intent.plugin_id, action=intent.plugin_id.split(".")[-1], status="pending" if pending_slots else "completed", slots=session.slots.copy(), ) workflow = Workflow( workflow_id=f"wf_{session.session_id}", workflow_type="single", domain=intent.domain, intent_id=session.current_intent, status="waiting_slot" if pending_slots else "ready", risk_level=intent.risk_level, slots=session.slots.copy(), missing_slots=missing, steps=[step], meta={ "source": "rule_router", "routing_debug": session.routing_debug or {}, }, ) session.workflow = workflow.model_dump() return workflow def _fallback_response(self, session_id: str, routing_debug: RoutingDebug | None = None) -> ChatResponse: return ChatResponse( session_id=session_id, reply_type="fallback", reply_text=self.response_policy.fallback(), decision=routing_debug.decision if routing_debug is not None else None, decision_reason=routing_debug.decision_reason if routing_debug is not None else None, status="fallback", routing_debug=routing_debug, trace_id=self._trace_id(), ) def _handle_stop_request(self, session: SessionState, text: str) -> ChatResponse | None: if not self._is_stop_request(text) or not self._has_active_task(session): return None self._reset_active_task(session) return ChatResponse( session_id=session.session_id, reply_type="text", reply_text=self.response_policy.task_stopped(), status="stopped", pending_slots=[], filled_slots={}, workflow=None, routing_debug=None, trace_id=self._trace_id(), ) def _handle_social_turn(self, session: SessionState, text: str) -> ChatResponse | None: social_route = self.social_router.route(text, session) if social_route.category == "none": return None if self.social_responder is None: reply_text = self.response_policy.open_social_fallback() return self._social_response( session=session, reply_text=reply_text, decision="open_social", decision_reason="open social matched but no responder configured", ) social_result = self.social_responder.reply(text, session) reply_text = social_result.text.strip() or self.response_policy.open_social_fallback() return self._social_response( session=session, reply_text=reply_text, decision="open_social", decision_reason=social_route.reason, ) def _social_response( self, session: SessionState, reply_text: str, decision: str, decision_reason: str, ) -> ChatResponse: session.context_memory["last_dialog_mode"] = decision pending_hint = self.response_policy.pending_task_hint( session.status, session.pending_slots, session.current_intent, ) final_reply = self.response_policy.with_pending_hint(reply_text, pending_hint) workflow = self._workflow_from_session(session) status = session.status if session.status != "idle" else "social" return ChatResponse( session_id=session.session_id, reply_type="text", reply_text=final_reply, intent=session.current_intent, domain=self._domain_for_intent(session.current_intent), decision=decision, decision_reason=decision_reason, status=status, pending_slots=list(session.pending_slots), filled_slots=session.slots.copy(), workflow=workflow, routing_debug=self._routing_debug_from_session(session), trace_id=self._trace_id(), ) def _routing_debug_from_session(self, session: SessionState) -> RoutingDebug | None: if session.routing_debug is None: return None return RoutingDebug.model_validate(session.routing_debug) def _trace_id(self) -> str: return uuid4().hex def _domain_for_intent(self, intent_id: str | None) -> str | None: """Return the domain for a given intent_id, or None if unknown.""" if intent_id is None: return None try: return self.intent_registry.get(intent_id).domain except KeyError: return None def _planner_indicates_out_of_scope(self, planner_stage: MatcherStageDebug | None) -> bool: if planner_stage is None or planner_stage.accepted: return False reason = str(planner_stage.reason or "").strip().lower() if not reason: return False out_of_scope_signals = ( "no matching intent", "there is no matching intent", "not in the catalog", "intent catalog only contains", "provided intent catalog only contains", "out of scope", "third-party app action", "outside known local capabilities", "没有匹配意图", "不在意图目录", "不在目录中", "超出能力范围", "能力之外", ) return any(signal in reason for signal in out_of_scope_signals) def _mark_timing(self, breakdown: dict[str, float], name: str, started_at: float) -> None: breakdown[name] = round((perf_counter() - started_at) * 1000, 3) def _finalize_response( self, response: ChatResponse, started_at: float, breakdown: dict[str, float], first_response_at: float | None = None, ) -> ChatResponse: first_ready_at = first_response_at or perf_counter() response.first_response_latency_ms = round((first_ready_at - started_at) * 1000, 3) response.total_latency_ms = round((perf_counter() - started_at) * 1000, 3) response.processing_breakdown = dict(breakdown) if response.routing_debug is not None: if response.routing_debug.total_match_latency_ms is not None: response.processing_breakdown.setdefault( "match_pipeline_ms", response.routing_debug.total_match_latency_ms, ) return response def _top_candidate_ids(self, routing_debug: RoutingDebug, limit: int = 3) -> list[str]: if not routing_debug.stages: return [] fusion_stage = next((stage for stage in reversed(routing_debug.stages) if stage.stage == "fusion"), None) if fusion_stage is None: return [] return [candidate.intent_id for candidate in fusion_stage.candidates[:limit]] def _plan_if_needed( self, session: SessionState, text: str, routing_debug: RoutingDebug, ) -> PlanningResult | None: if self.planner is None: return None if not self._should_use_planner(text, routing_debug): return None planner_started_at = perf_counter() result = self.planner.plan( text, self.intent_registry.list(), context={ "current_intent": session.current_intent, "slots": session.slots, "context_memory": session.context_memory, }, ) planner_elapsed_ms = round((perf_counter() - planner_started_at) * 1000, 3) self._attach_planner_debug(routing_debug, result, planner_elapsed_ms) return result def _should_use_planner(self, text: str, routing_debug: RoutingDebug) -> bool: fusion_stage = routing_debug.stages[-1] if routing_debug.stages else None has_complex_pattern = any(token in text for token in ("然后", "并且", "同时", "如果", "若", "先", "后", "顺便", "以及")) if has_complex_pattern: return True if self._looks_like_parallel_request(text, fusion_stage): return True if text.count(",") >= 1 or text.count(",") >= 1 or text.count(";") >= 1: return True if fusion_stage is not None and fusion_stage.stage == "fusion" and not fusion_stage.accepted: return True return False def _attach_planner_debug( self, routing_debug: RoutingDebug, planning_result: PlanningResult, elapsed_ms: float | None = None, ) -> None: metadata = { "workflow_type": planning_result.workflow_type, "raw_response": planning_result.raw_response, } metadata.update(planning_result.metadata) routing_debug.stages.append( MatcherStageDebug( stage="planner", accepted=planning_result.accepted, selected_intent=planning_result.steps[0].intent_id if planning_result.steps else None, score=float(len(planning_result.steps)), elapsed_ms=elapsed_ms, reason=planning_result.reason, model_name=planning_result.model_name, backend=planning_result.backend, error_message=planning_result.error_message, metadata=metadata, candidates=[ IntentCandidate( intent_id=step.intent_id, score=1.0, reason=step.reason, model_name=planning_result.model_name, metadata=planning_result.metadata.get("normalized_steps", [{}])[index] if index < len(planning_result.metadata.get("normalized_steps", [])) else {"slots": step.slots}, ) for index, step in enumerate(planning_result.steps[:5]) ], ) ) def _start_planned_workflow( self, session: SessionState, planning_result: PlanningResult, routing_debug: RoutingDebug, user_text: str, rewrite_result: RewriteResult, ) -> ChatResponse: workflow = self._build_planned_workflow(session, planning_result) session.workflow = workflow.model_dump() session.routing_debug = routing_debug.model_dump() response = self._continue_planned_workflow(session, workflow, routing_debug) self._record_turn(session, user_text, response.reply_text, rewrite_result) return response def _build_planned_workflow(self, session: SessionState, planning_result: PlanningResult) -> Workflow: steps: list[WorkflowStep] = [] first_intent = planning_result.steps[0].intent_id domain = self.intent_registry.get(first_intent).domain risk_level = self.intent_registry.get(first_intent).risk_level step_id_map = {index: f"step_{session.session_id}_{index}" for index in range(1, len(planning_result.steps) + 1)} for index, planned_step in enumerate(planning_result.steps, start=1): intent = self.intent_registry.get(planned_step.intent_id) steps.append( WorkflowStep( step=index, step_id=step_id_map[index], intent_id=planned_step.intent_id, plugin_id=intent.plugin_id, action=intent.plugin_id.split(".")[-1], status="pending", depends_on=[ step_id_map[dep_index] for dep_index in planned_step.depends_on if dep_index in step_id_map ], slots=dict(planned_step.slots), condition=dict(planned_step.condition), requires_confirmation=planned_step.requires_confirmation or self._requires_confirmation(intent), ) ) return Workflow( workflow_id=f"wf_{session.session_id}", workflow_type=planning_result.workflow_type if planning_result.workflow_type in {"single", "sequence", "conditional", "parallel"} else "sequence", domain=domain, intent_id=first_intent, status="ready", risk_level=risk_level, slots=session.slots.copy(), missing_slots=[], steps=steps, meta={ "source": planning_result.backend, "planner_model": planning_result.model_name, "planner_reason": planning_result.reason, "planner_debug": planning_result.metadata, "step_results": {}, "confirmed_steps": [], }, ) def _continue_planned_workflow( self, session: SessionState, workflow: Workflow, routing_debug: RoutingDebug | None, carry_messages: list[str] | None = None, ) -> ChatResponse: step_messages: list[str] = list(carry_messages or []) step_results = workflow.meta.setdefault("step_results", {}) confirmed_steps = workflow.meta.setdefault("confirmed_steps", []) for step in workflow.steps: if step.status in {"completed", "skipped"}: continue if step.depends_on and not all(self._is_step_completed(workflow, dependency) for dependency in step.depends_on): continue intent = self.intent_registry.get(step.intent_id) merged_slots = session.slots.copy() merged_slots.update(step.slots) merged_slots = self._normalize_slots_for_intent(intent.intent_id, merged_slots) step.slots = merged_slots.copy() session.current_intent = intent.intent_id missing_slots = self._pending_slots_for_intent(intent, merged_slots) if missing_slots: workflow.status = "waiting_slot" session.pending_slots = missing_slots session.status = "waiting_slot" workflow.missing_slots = [ MissingSlot( slot_name=slot, ask_template=self._default_ask_template(intent, slot), priority=index + 1, ) for index, slot in enumerate(missing_slots) ] session.workflow = workflow.model_dump() return ChatResponse( session_id=session.session_id, reply_type="ask_slot", reply_text=self.response_policy.ask_for_slot( intent, missing_slots[0], self._default_ask_template(intent, missing_slots[0]), ), intent=intent.intent_id, domain=intent.domain, status=session.status, pending_slots=missing_slots, filled_slots=merged_slots, workflow=workflow, routing_debug=routing_debug, trace_id=self._trace_id(), ) condition_state = self._evaluate_step_condition(step, workflow) if condition_state is False: step.status = "skipped" reason = step.condition.get("description") if step.condition else None step_messages.append(self.response_policy.step_skipped(intent, reason)) continue if step.requires_confirmation and step.step_id not in confirmed_steps: step.status = "waiting_confirmation" workflow.status = "waiting_confirmation" session.pending_slots = ["confirmation"] session.status = "waiting_confirmation" workflow.meta["pending_confirmation"] = { "step_id": step.step_id, "intent_id": intent.intent_id, "detail": step.condition.get("description") if step.condition else None, } session.workflow = workflow.model_dump() return ChatResponse( session_id=session.session_id, reply_type="ask_confirmation", reply_text=self.response_policy.ask_for_confirmation( intent, str(workflow.meta["pending_confirmation"].get("detail") or "").strip() or None, ), intent=intent.intent_id, domain=intent.domain, status=session.status, pending_slots=["confirmation"], filled_slots=merged_slots, workflow=workflow, routing_debug=routing_debug, trace_id=self._trace_id(), ) plugin_result = self.plugins.execute(intent.plugin_id, merged_slots) step.status = "completed" step_results[step.step_id] = plugin_result session.slots.update(merged_slots) self._update_context_memory(session, intent.intent_id, merged_slots) step_messages.append(self.response_policy.workflow_result(intent, plugin_result)) workflow.status = "completed" workflow.missing_slots = [] session.pending_slots = [] session.status = "completed" session.current_intent = workflow.intent_id session.workflow = workflow.model_dump() return ChatResponse( session_id=session.session_id, reply_type="workflow_result", reply_text=self.response_policy.workflow_summary(step_messages), intent=workflow.intent_id, domain=self._domain_for_intent(workflow.intent_id), status=session.status, pending_slots=[], filled_slots=session.slots, workflow=workflow, routing_debug=routing_debug, trace_id=self._trace_id(), ) def _requires_confirmation(self, intent) -> bool: return self.dialog_rules.requires_confirmation(intent.intent_id, intent.risk_level) def _is_step_completed(self, workflow: Workflow, step_id: str) -> bool: dependency = next((item for item in workflow.steps if item.step_id == step_id), None) return dependency is not None and dependency.status == "completed" def _evaluate_step_condition(self, step: WorkflowStep, workflow: Workflow) -> bool | None: if not step.condition: return None source_step_id = self._resolve_condition_source_step(step, workflow) if source_step_id is None: return None step_results = workflow.meta.get("step_results", {}) source_result = step_results.get(source_step_id, {}) if not isinstance(source_result, dict): return None field_name = str(step.condition.get("field", "")).strip() operator = str(step.condition.get("operator", "equals")).strip() or "equals" expected = step.condition.get("value") actual = source_result.get("data", {}).get(field_name) if operator == "equals": return actual == expected if operator == "not_equals": return actual != expected if operator == "in" and isinstance(expected, list): return actual in expected return None def _resolve_condition_source_step(self, step: WorkflowStep, workflow: Workflow) -> str | None: source_step = step.condition.get("source_step") try: if source_step is not None: source_index = int(source_step) matched = next((item for item in workflow.steps if item.step == source_index), None) if matched is not None: return matched.step_id except (TypeError, ValueError): return None if step.depends_on: return step.depends_on[0] return None def _handle_confirmation(self, session: SessionState, text: str) -> ChatResponse: workflow = self._workflow_from_session(session) routing_debug = self._routing_debug_from_session(session) if workflow is None: return self._fallback_response(session.session_id, routing_debug) pending = workflow.meta.get("pending_confirmation", {}) step_id = str(pending.get("step_id", "")).strip() if not step_id: return self._fallback_response(session.session_id, routing_debug) decision = self._parse_confirmation_decision(text) step = next((item for item in workflow.steps if item.step_id == step_id), None) if step is None: return self._fallback_response(session.session_id, routing_debug) intent = self.intent_registry.get(step.intent_id) if decision is None: return ChatResponse( session_id=session.session_id, reply_type="ask_confirmation", reply_text=self.response_policy.confirm_retry(), intent=intent.intent_id, domain=intent.domain, status=session.status, pending_slots=["confirmation"], filled_slots=session.slots, workflow=workflow, routing_debug=routing_debug, trace_id=self._trace_id(), ) workflow.meta.pop("pending_confirmation", None) if decision is False: step.status = "skipped" session.pending_slots = [] session.status = "running" skipped_reason = self.response_policy.confirm_cancelled() step_messages = [skipped_reason] response = self._continue_planned_workflow(session, workflow, routing_debug, carry_messages=step_messages) return response confirmed_steps = workflow.meta.setdefault("confirmed_steps", []) if step_id not in confirmed_steps: confirmed_steps.append(step_id) session.pending_slots = [] session.status = "running" return self._continue_planned_workflow(session, workflow, routing_debug) def _workflow_from_session(self, session: SessionState) -> Workflow | None: if session.workflow is None: return None return Workflow.model_validate(session.workflow) def _attach_rewrite_debug( self, routing_debug: RoutingDebug, rewrite_result: RewriteResult, elapsed_ms: float | None = None, ) -> None: routing_debug.stages.insert( 0, MatcherStageDebug( stage="rewrite", accepted=rewrite_result.applied, score=1.0 if rewrite_result.applied else 0.0, elapsed_ms=elapsed_ms, reason=rewrite_result.reason or "no rewrite needed", model_name="context-rewrite", metadata={ "original_text": rewrite_result.original_text, "rewritten_text": rewrite_result.rewritten_text, **rewrite_result.metadata, }, ), ) def _update_context_memory( self, session: SessionState, intent_id: str, extracted_slots: dict[str, object], ) -> None: session.context_memory["last_intent"] = intent_id if "temperature" in extracted_slots: session.context_memory["last_temperature"] = extracted_slots["temperature"] if "destination" in extracted_slots: session.context_memory["last_destination"] = extracted_slots["destination"] if "song" in extracted_slots: session.context_memory["last_song"] = extracted_slots["song"] if "genre" in extracted_slots: session.context_memory["last_genre"] = extracted_slots["genre"] def _normalize_slots_for_intent(self, intent_id: str, slots: dict[str, object]) -> dict[str, object]: normalized = dict(slots) if intent_id == "cabin_set_ac": temperature = self._normalize_temperature_value(normalized.get("temperature")) if temperature is not None: normalized["temperature"] = temperature else: normalized.pop("temperature", None) if intent_id == "cabin_play_music": media_query = str(normalized.get("media_query") or "").strip() if media_query and "song" not in normalized and "genre" not in normalized: if any(genre in media_query for genre in self._MUSIC_GENRE_HINTS): matched = next((genre for genre in self._MUSIC_GENRE_HINTS if genre in media_query), None) if matched: normalized["genre"] = matched else: normalized["song"] = media_query normalized.pop("media_query", None) return normalized def _continue_active_task(self, session: SessionState, text: str) -> ChatResponse | None: if session.current_intent is None: return None if session.status not in {"waiting_slot", "waiting_confirmation"}: return None if session.status == "waiting_confirmation": return None intent = self.intent_registry.get(session.current_intent) extracted_slots = self._extract_continuation_slots(session, intent, text) if not extracted_slots: return None session.slots.update(extracted_slots) self._update_context_memory(session, intent.intent_id, extracted_slots) if session.routing_debug is not None: routing_debug = RoutingDebug.model_validate(session.routing_debug) routing_debug.extracted_slots = session.slots.copy() session.routing_debug = routing_debug.model_dump() existing_workflow = self._workflow_from_session(session) if existing_workflow is not None and existing_workflow.workflow_type != "single": return self._continue_planned_workflow( session, existing_workflow, self._routing_debug_from_session(session), ) return self._build_response_from_session(session) def _extract_continuation_slots( self, session: SessionState, intent, text: str, ) -> dict[str, object]: extracted = self.router.extract_slots(text, intent) if extracted: return extracted normalized = text.strip() if not normalized: return {} candidate_texts = self._continuation_candidate_texts(intent.intent_id, normalized, session.pending_slots) merged: dict[str, object] = {} for candidate_text in candidate_texts: candidate_slots = self.router.extract_slots(candidate_text, intent) if candidate_slots: merged.update(candidate_slots) break if merged: return merged if intent.intent_id == "cabin_play_music" and self._requires_music_query(session.pending_slots): compact = normalized.strip(",。,.!?!? ") if not compact: return {} if self.dialog_act_engine.detect(compact) in {"thanks", "greeting", "goodbye"}: return {} matched_genre = next((genre for genre in self._MUSIC_GENRE_HINTS if genre in compact), None) if matched_genre is not None: return {"genre": matched_genre} if len(compact) <= 20: return {"song": compact} return {} def _continuation_candidate_texts( self, intent_id: str, text: str, pending_slots: list[str], ) -> list[str]: pending = list(pending_slots) if intent_id == "cabin_play_music" and self._requires_music_query(pending): return [f"播放{text}", f"来一首{text}", f"来点{text}"] if intent_id == "cabin_nav_to" and "destination" in pending: return [f"导航去{text}", f"带我去{text}"] if intent_id == "cabin_set_ac" and "temperature" in pending: return [f"把空调调到{text}", f"温度设成{text}"] if intent_id in {"cs_query_order", "cs_query_logistics", "cs_cancel_order"} and "order_id" in pending: prefix = { "cs_query_order": "查一下订单", "cs_query_logistics": "查一下物流", "cs_cancel_order": "取消订单", }.get(intent_id, "") return [f"{prefix}{text}"] if prefix else [text] return [] def _pending_slots_for_intent(self, intent, slots: dict[str, object]) -> list[str]: pending_slots = [slot for slot in intent.required_slots if slot not in slots] if intent.intent_id == "cabin_play_music" and not self._has_music_query(slots): pending_slots.append("media_query") deduped: list[str] = [] for slot in pending_slots: if slot not in deduped: deduped.append(slot) return deduped def _default_ask_template(self, intent, slot_name: str) -> str: if slot_name in intent.ask_templates: return str(intent.ask_templates[slot_name]) if slot_name == "media_query": return "想听什么风格或者具体的歌名?" return "请补充一个关键信息。" def _has_music_query(self, slots: dict[str, object]) -> bool: return bool(str(slots.get("song") or "").strip() or str(slots.get("genre") or "").strip()) def _requires_music_query(self, pending_slots: list[str]) -> bool: return "media_query" in pending_slots or not pending_slots def _looks_like_parallel_request(self, text: str, fusion_stage: MatcherStageDebug | None) -> bool: _ = fusion_stage compact = re.sub(r"\s+", "", text) return bool(re.search(r"(和|跟|以及|外加).+", compact) and len(compact) >= 5) def _normalize_temperature_value(self, raw_value: object) -> int | None: if isinstance(raw_value, bool): return None if isinstance(raw_value, (int, float)): value = int(raw_value) elif isinstance(raw_value, str): match = re.search(r"-?\d+", raw_value) if not match: return None value = int(match.group(0)) else: return None return max(16, min(30, value)) def _record_turn( self, session: SessionState, user_text: str, agent_text: str, rewrite_result: RewriteResult, ) -> None: session.last_user_text = user_text session.last_agent_text = agent_text session.context_memory["last_raw_user_text"] = user_text session.context_memory["last_rewritten_text"] = rewrite_result.rewritten_text def _update_dialog_act(self, session: SessionState, text: str) -> None: session.context_memory["last_dialog_act"] = self.dialog_act_engine.detect(text) def _has_active_task(self, session: SessionState) -> bool: if session.status in {"waiting_slot", "waiting_confirmation", "running", "understanding"}: return True workflow = self._workflow_from_session(session) if workflow is not None and workflow.status not in {"completed", "failed"}: return True return False def _is_stop_request(self, text: str) -> bool: return self.dialog_rules.is_stop_request(text) def _reset_active_task(self, session: SessionState) -> None: session.status = "idle" session.current_intent = None session.pending_slots = [] session.slots = {} session.workflow = None session.routing_debug = None def _parse_confirmation_decision(self, text: str) -> bool | None: return self.dialog_rules.parse_confirmation_decision(text) def _try_knowledge_llm( self, session: SessionState, user_text: str, routing_debug: RoutingDebug, rewrite_result: RewriteResult, ) -> ChatResponse | None: """BERT 未命中时尝试 LLM + knowledge_search function call。 有结果返回 ChatResponse(含 knowledge_content),无结果返回 None。 """ if self.knowledge_llm is None: return None result = self.knowledge_llm.reply(user_text) # LLM 调用失败或本地兜底但没有文档 → 退回原始 fallback if result.backend == "local-fallback" and result.doc_id is None: return None return ChatResponse( session_id=session.session_id, reply_type="text", reply_text=result.reply_text, intent=None, domain="knowledge", decision="knowledge_llm", decision_reason=( f"BERT 未命中,LLM 通过 knowledge_search 命中文档:{result.doc_id}" if result.doc_id else "BERT 未命中,LLM 自主回答" ), status="completed", knowledge_doc_id=result.doc_id, knowledge_doc_title=result.doc_title, knowledge_content=result.doc_content, routing_debug=routing_debug, trace_id=self._trace_id(), )