from __future__ import annotations import json import re from dataclasses import dataclass, field from typing import Any, Protocol from urllib import error, request from app.services.classifier import ClassificationResult, IntentClassifier from app.services.joint_nlu import JointBertNLU from app.services.multi_intent_detector import MultiIntentDetectionResult, MultiIntentDetector from app.schemas.configuration import WorkflowTemplateDefinition, WorkflowTemplatesConfig from app.schemas.intent import IntentDefinition @dataclass class PlannedStep: intent_id: str slots: dict[str, Any] = field(default_factory=dict) depends_on: list[int] = field(default_factory=list) condition: dict[str, Any] = field(default_factory=dict) requires_confirmation: bool = False reason: str | None = None @dataclass class PlanningResult: accepted: bool workflow_type: str = "single" steps: list[PlannedStep] = field(default_factory=list) model_name: str = "heuristic-planner" backend: str = "local" reason: str | None = None error_message: str | None = None raw_response: str | None = None metadata: dict[str, Any] = field(default_factory=dict) @dataclass class ClauseParseResult: clause_text: str selected_intent_id: str | None = None score: float = 0.0 slots: dict[str, Any] = field(default_factory=dict) reason: str | None = None candidates: list[dict[str, Any]] = field(default_factory=list) @dataclass class MultiIntentParseResult: clauses: list[ClauseParseResult] = field(default_factory=list) workflow_type: str = "single" detected: bool = False reason: str | None = None detector: MultiIntentDetectionResult | None = None @property def matched_ids(self) -> list[str]: return [clause.selected_intent_id for clause in self.clauses if clause.selected_intent_id] @property def matched_clauses(self) -> list[ClauseParseResult]: return [clause for clause in self.clauses if clause.selected_intent_id] class WorkflowPlanner(Protocol): def plan( self, text: str, intents: list[IntentDefinition], context: dict[str, Any] | None = None, ) -> PlanningResult: ... class TemplateWorkflowPlanner: def __init__( self, templates: WorkflowTemplatesConfig | None = None, clause_classifier: IntentClassifier | None = None, multi_intent_detector: MultiIntentDetector | None = None, joint_nlu: JointBertNLU | None = None, classifier_weight: float = 1.6, model_only_threshold: float = 0.62, ) -> None: self._templates = templates or WorkflowTemplatesConfig() self._clause_classifier = clause_classifier self._multi_intent_detector = multi_intent_detector self._joint_nlu = joint_nlu self._classifier_weight = classifier_weight self._model_only_threshold = model_only_threshold def plan( self, text: str, intents: list[IntentDefinition], context: dict[str, Any] | None = None, ) -> PlanningResult: _ = context if not self._templates.templates: return PlanningResult( accepted=False, model_name="template-planner", backend="local-template", reason="no workflow templates configured", ) parse_result = _analyze_multi_intent( text, intents, clause_classifier=self._clause_classifier, multi_intent_detector=self._multi_intent_detector, joint_nlu=self._joint_nlu, classifier_weight=self._classifier_weight, model_only_threshold=self._model_only_threshold, ) matched_ids = parse_result.matched_ids if len(matched_ids) < 2: return PlanningResult( accepted=False, model_name="template-planner", backend="local-template", reason="not enough matched clauses for workflow template", metadata={ "matched_intents": matched_ids, "input_clauses": [item.clause_text for item in parse_result.clauses], "clause_analysis": _serialize_clause_analysis(parse_result), "multi_intent_detected": parse_result.detected, "multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector), }, ) for template in self._templates.templates: if self._matches_template(text, matched_ids, template): steps = self._build_steps(template, parse_result) return PlanningResult( accepted=True, workflow_type=template.workflow_type, steps=steps, model_name="template-planner", backend="local-template", reason=f"matched workflow template: {template.template_id}", metadata={ "template_id": template.template_id, "input_clauses": [item.clause_text for item in parse_result.clauses], "matched_intents": matched_ids, "clause_analysis": _serialize_clause_analysis(parse_result), "multi_intent_detected": parse_result.detected, "multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector), "normalized_steps": [ { "intent_id": step.intent_id, "slots": step.slots, "depends_on": step.depends_on, "condition": step.condition, "requires_confirmation": step.requires_confirmation, } for step in steps ], }, ) return PlanningResult( accepted=False, model_name="template-planner", backend="local-template", reason="no workflow template matched the current clause intents", metadata={ "matched_intents": matched_ids, "input_clauses": [item.clause_text for item in parse_result.clauses], "clause_analysis": _serialize_clause_analysis(parse_result), "multi_intent_detected": parse_result.detected, "multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector), }, ) def _matches_template(self, text: str, matched_ids: list[str], template: WorkflowTemplateDefinition) -> bool: if template.trigger_keywords and not all(keyword in text for keyword in template.trigger_keywords): return False return matched_ids[: len(template.intent_sequence)] == template.intent_sequence def _build_steps( self, template: WorkflowTemplateDefinition, parse_result: MultiIntentParseResult, ) -> list[PlannedStep]: steps: list[PlannedStep] = [] matched_clauses = parse_result.matched_clauses for index, intent_id in enumerate(template.intent_sequence): clause_result = matched_clauses[index] if index < len(matched_clauses) else None clause_text = clause_result.clause_text if clause_result is not None else "" override = template.step_overrides[index] if index < len(template.step_overrides) else None steps.append( PlannedStep( intent_id=intent_id, slots=_extract_slots_for_intent( clause_text or " ".join(item.clause_text for item in parse_result.clauses), intent_id, joint_nlu=self._joint_nlu, ), depends_on=list(override.depends_on) if override is not None else [], condition=dict(override.condition) if override is not None else {}, requires_confirmation=bool(override.requires_confirmation) if override is not None else False, reason=f"workflow template matched clause {index + 1}: {clause_result.selected_intent_id if clause_result is not None else intent_id}", ) ) return steps class CompositeWorkflowPlanner: def __init__(self, planners: list[WorkflowPlanner]) -> None: self._planners = planners def plan( self, text: str, intents: list[IntentDefinition], context: dict[str, Any] | None = None, ) -> PlanningResult: last_result = PlanningResult(accepted=False, reason="no planner configured") for planner in self._planners: result = planner.plan(text, intents, context) last_result = result if result.accepted: return result return last_result class HeuristicWorkflowPlanner: def __init__( self, clause_classifier: IntentClassifier | None = None, multi_intent_detector: MultiIntentDetector | None = None, joint_nlu: JointBertNLU | None = None, classifier_weight: float = 1.6, model_only_threshold: float = 0.62, ) -> None: self._clause_classifier = clause_classifier self._multi_intent_detector = multi_intent_detector self._joint_nlu = joint_nlu self._classifier_weight = classifier_weight self._model_only_threshold = model_only_threshold def plan( self, text: str, intents: list[IntentDefinition], context: dict[str, Any] | None = None, ) -> PlanningResult: _ = context parse_result = _analyze_multi_intent( text, intents, clause_classifier=self._clause_classifier, multi_intent_detector=self._multi_intent_detector, joint_nlu=self._joint_nlu, classifier_weight=self._classifier_weight, model_only_threshold=self._model_only_threshold, ) if not parse_result.detected and not _has_complex_pattern(text): return PlanningResult( accepted=False, model_name="heuristic-planner", backend="local-heuristic", reason="single command or no explicit planning pattern detected", metadata={ "input_clauses": [item.clause_text for item in parse_result.clauses], "clause_analysis": _serialize_clause_analysis(parse_result), "multi_intent_detected": parse_result.detected, "multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector), }, ) steps: list[PlannedStep] = [] for clause in parse_result.matched_clauses: if clause.selected_intent_id is None: continue steps.append( PlannedStep( intent_id=clause.selected_intent_id, slots=clause.slots.copy(), reason=clause.reason or f"heuristic parse for clause: {clause.clause_text}", ) ) workflow_type = parse_result.workflow_type if workflow_type == "conditional": steps = _apply_conditional_hints(text, steps) if len(steps) >= 2: return PlanningResult( accepted=True, workflow_type=workflow_type, steps=steps, model_name="heuristic-planner", backend="local-heuristic", reason=parse_result.reason or "heuristic planner split the request into multiple executable clauses", metadata={ "input_clauses": [item.clause_text for item in parse_result.clauses], "clause_analysis": _serialize_clause_analysis(parse_result), "multi_intent_detected": parse_result.detected, "multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector), "normalized_steps": [ { "intent_id": step.intent_id, "slots": step.slots, "depends_on": step.depends_on, "condition": step.condition, "requires_confirmation": step.requires_confirmation, "reason": step.reason, } for step in steps ], }, ) return PlanningResult( accepted=False, workflow_type=workflow_type, steps=steps, model_name="heuristic-planner", backend="local-heuristic", reason="planning detected but local heuristic could not produce enough executable steps", metadata={ "input_clauses": [item.clause_text for item in parse_result.clauses], "clause_analysis": _serialize_clause_analysis(parse_result), "multi_intent_detected": parse_result.detected, "multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector), }, ) class DashScopeWorkflowPlanner: def __init__( self, base_url: str, api_key: str, model_name: str, timeout_seconds: float = 6.0, fallback: WorkflowPlanner | None = None, joint_nlu: JointBertNLU | None = None, ) -> None: self._base_url = base_url.rstrip("/") self._api_key = api_key self._model_name = model_name self._timeout_seconds = timeout_seconds self._fallback = fallback or HeuristicWorkflowPlanner(joint_nlu=joint_nlu) self._joint_nlu = joint_nlu def plan( self, text: str, intents: list[IntentDefinition], context: dict[str, Any] | None = None, ) -> PlanningResult: if not self._base_url or not self._api_key or not self._model_name: return self._fallback_result( text, intents, context, reason="planner is not configured", error_message="set AGENT_PLANNER_BASE_URL / AGENT_PLANNER_API_KEY / AGENT_PLANNER_MODEL_NAME", ) payload = { "model": self._model_name, "temperature": 0.1, "enable_thinking": False, "max_tokens": 320, "messages": [ { "role": "system", "content": self._system_prompt(intents), }, { "role": "user", "content": json.dumps( { "text": text, "clauses": _split_clauses(text), "context": context or {}, }, ensure_ascii=False, ), }, ], } req = request.Request( self._endpoint(), data=json.dumps(payload).encode("utf-8"), headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self._api_key}", }, method="POST", ) try: with request.urlopen(req, timeout=self._timeout_seconds) as response: data = json.loads(response.read().decode("utf-8")) except (error.URLError, TimeoutError, ValueError) as exc: return self._fallback_result( text, intents, context, reason="cloud planner request failed", error_message=str(exc), ) content = self._extract_content(data) if not content: return self._fallback_result( text, intents, context, reason="cloud planner returned empty content", raw_response=json.dumps(data, ensure_ascii=False), ) try: plan_data = self._parse_json_content(content) except ValueError as exc: return self._fallback_result( text, intents, context, reason="cloud planner returned invalid json", error_message=str(exc), raw_response=content, ) steps, normalization_meta = self._normalize_steps(plan_data, text, intents) accepted = bool(plan_data.get("accepted", bool(steps))) workflow_type = str(plan_data.get("workflow_type", "single" if len(steps) <= 1 else "sequence")) return PlanningResult( accepted=accepted, workflow_type=workflow_type, steps=steps, model_name=self._model_name, backend="dashscope", reason=str(plan_data.get("reason", "")).strip() or "cloud planner parsed successfully", raw_response=content, metadata={ "input_clauses": _split_clauses(text), "parsed_plan": plan_data, "normalized_steps": normalization_meta, }, ) def _endpoint(self) -> str: if self._base_url.endswith("/chat/completions"): return self._base_url return f"{self._base_url}/chat/completions" def _system_prompt(self, intents: list[IntentDefinition]) -> str: intent_catalog = [ { "intent_id": intent.intent_id, "required_slots": intent.required_slots, "keywords": intent.keywords[:4], } for intent in intents ] return ( "Plan user requests into executable steps for a cabin/service assistant. " "Return strict JSON only, no markdown. " "Use only intent_id values from the catalog. " "If the request is a single clear command, return one step. " "If it contains multiple commands, return sequence steps. " "Fill slots whenever the value is explicit in the user text. " "Examples: order_id like A123456, destination like 公司/机场, temperature as integer, music may use song or genre such as 轻音乐. " "Each step should only contain slots relevant to that intent. " "For conditional requests, mark workflow_type as conditional and attach depends_on plus condition. " "Use requires_confirmation=true for risky actions such as canceling an order. " 'Schema: {"accepted":true,"workflow_type":"single|sequence|conditional","reason":"brief","steps":[{"intent_id":"...","slots":{},"depends_on":[1],"condition":{"field":"order_status","operator":"equals","value":"pending_shipment","description":"only cancel if not shipped"},"requires_confirmation":true,"reason":"brief"}]}. ' f"Catalog={json.dumps(intent_catalog, ensure_ascii=False, separators=(',', ':'))}" ) def _extract_content(self, data: dict[str, Any]) -> str: choices = data.get("choices") if not isinstance(choices, list) or not choices: return "" message = choices[0].get("message", {}) content = message.get("content", "") if isinstance(content, str): return content.strip() if isinstance(content, list): text_parts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": text_parts.append(str(item.get("text", ""))) return "".join(text_parts).strip() return "" def _parse_json_content(self, content: str) -> dict[str, Any]: cleaned = content.strip() if cleaned.startswith("```"): cleaned = cleaned.strip("`") if cleaned.startswith("json"): cleaned = cleaned[4:].strip() start = cleaned.find("{") end = cleaned.rfind("}") if start < 0 or end < 0 or end <= start: raise ValueError("planner content does not contain a JSON object") return json.loads(cleaned[start : end + 1]) def _normalize_steps( self, plan_data: dict[str, Any], text: str, intents: list[IntentDefinition], ) -> tuple[list[PlannedStep], list[dict[str, Any]]]: intent_map = {intent.intent_id: intent for intent in intents} clauses = _split_clauses(text) raw_steps = plan_data.get("steps", []) steps: list[PlannedStep] = [] normalization_meta: list[dict[str, Any]] = [] if not isinstance(raw_steps, list): return steps, normalization_meta for index, raw_step in enumerate(raw_steps): if not isinstance(raw_step, dict): continue intent_id = str(raw_step.get("intent_id", "")).strip() if not intent_id or intent_id not in intent_map: continue cloud_slots = raw_step.get("slots", {}) if not isinstance(cloud_slots, dict): cloud_slots = {} clause_text = clauses[index] if index < len(clauses) else text clause_slots = _extract_slots_for_intent(clause_text, intent_id, joint_nlu=self._joint_nlu) full_text_slots = _extract_slots_for_intent(text, intent_id, joint_nlu=self._joint_nlu) normalized_slots = self._merge_slots(intent_id, cloud_slots, clause_slots, full_text_slots) steps.append( PlannedStep( intent_id=intent_id, slots=normalized_slots, depends_on=self._normalize_depends_on(raw_step.get("depends_on")), condition=self._normalize_condition(intent_id, raw_step.get("condition")), requires_confirmation=bool(raw_step.get("requires_confirmation", False)), reason=str(raw_step.get("reason", "")).strip() or None, ) ) normalization_meta.append( { "intent_id": intent_id, "clause_text": clause_text, "cloud_slots": cloud_slots, "clause_slots": clause_slots, "full_text_slots": full_text_slots, "normalized_slots": normalized_slots, "depends_on": steps[-1].depends_on, "condition": steps[-1].condition, "requires_confirmation": steps[-1].requires_confirmation, } ) inferred_type = str(plan_data.get("workflow_type", "")) if inferred_type == "conditional" or _has_conditional_pattern(text): steps = _apply_conditional_hints(text, steps) for index, step in enumerate(steps): if index < len(normalization_meta): normalization_meta[index]["depends_on"] = step.depends_on normalization_meta[index]["condition"] = step.condition normalization_meta[index]["requires_confirmation"] = step.requires_confirmation return steps, normalization_meta def _merge_slots( self, intent_id: str, cloud_slots: dict[str, Any], clause_slots: dict[str, Any], full_text_slots: dict[str, Any], ) -> dict[str, Any]: merged: dict[str, Any] = {} for source in (full_text_slots, clause_slots, cloud_slots): for key, value in source.items(): if value in ("", None, []): continue merged[key] = value allowed_slot_keys = { "cs_query_order": {"order_id"}, "cs_query_logistics": {"order_id"}, "cs_cancel_order": {"order_id"}, "cabin_nav_to": {"destination"}, "cabin_set_ac": {"temperature"}, "cabin_play_music": {"song", "genre"}, }.get(intent_id) if allowed_slot_keys is None: return merged return {key: value for key, value in merged.items() if key in allowed_slot_keys} def _normalize_depends_on(self, raw_value: Any) -> list[int]: if not isinstance(raw_value, list): return [] result: list[int] = [] for item in raw_value: try: index = int(item) except (TypeError, ValueError): continue if index > 0: result.append(index) return result def _normalize_condition(self, intent_id: str, raw_value: Any) -> dict[str, Any]: if not isinstance(raw_value, dict): return {} normalized: dict[str, Any] = {} for key in ("field", "operator", "value", "description"): if key in raw_value and raw_value[key] not in ("", None): normalized[key] = raw_value[key] source_step = raw_value.get("source_step") try: if source_step is not None and int(source_step) > 0: normalized["source_step"] = int(source_step) except (TypeError, ValueError): pass if ( intent_id == "cs_cancel_order" and normalized.get("field") == "order_status" and normalized.get("value") == "pending_shipment" ): normalized["description"] = "仅在订单未发货时取消" return normalized def _fallback_result( self, text: str, intents: list[IntentDefinition], context: dict[str, Any] | None, reason: str, error_message: str | None = None, raw_response: str | None = None, ) -> PlanningResult: fallback = self._fallback.plan(text, intents, context) fallback.reason = reason fallback.error_message = error_message fallback.raw_response = raw_response fallback.backend = "dashscope-fallback" return fallback def _extract_slots_for_intent( text: str, intent_id: str, joint_nlu: JointBertNLU | None = None, ) -> dict[str, Any]: if joint_nlu is not None: predicted = joint_nlu.extract_slots_by_intent_id(text, intent_id) if predicted: return predicted slots: dict[str, Any] = {} cleaned_text = text.strip() if not cleaned_text: return slots order_id_match = re.search(r"\b[A-Za-z]\d{5,}\b", cleaned_text) if order_id_match: slots["order_id"] = order_id_match.group(0) temperature_match = re.search(r"(\d{2})\s*度", cleaned_text) if temperature_match: slots["temperature"] = int(temperature_match.group(1)) if intent_id == "cabin_nav_to": destination = _extract_destination(cleaned_text) if destination: slots["destination"] = destination if intent_id == "cabin_play_music": music_slots = _extract_music_slots(cleaned_text) slots.update(music_slots) return slots def _extract_destination(text: str) -> str | None: for pattern in ( r"导航去(?P.+)", r"导航到(?P.+)", r"带我去(?P.+)", r"去(?P.+)", r"到(?P.+)", ): match = re.search(pattern, text) if match: destination = re.split(r"(?:然后|并且|同时|再|,|,|;|;)", match.group("destination"), maxsplit=1)[0] destination = destination.strip(" ,。") if destination: return destination return None def _extract_music_slots(text: str) -> dict[str, Any]: slots: dict[str, Any] = {} genre_keywords = ("轻音乐", "摇滚", "古典", "民谣", "爵士", "流行", "儿歌") for genre in genre_keywords: if genre in text: slots["genre"] = genre break for trigger in ("播放", "来点", "放点", "听", "来首", "来一首"): if trigger in text: raw_target = text.split(trigger, maxsplit=1)[-1] raw_target = re.split(r"(?:然后|并且|同时|再|,|,|;|;)", raw_target, maxsplit=1)[0].strip(" ,。") matched_genre = next((genre for genre in genre_keywords if genre in raw_target), None) if matched_genre: slots["genre"] = matched_genre break target = raw_target.strip(" 的一首首个歌曲音乐吧呀啊,。") if target and target not in {"歌", "音乐"} and len(target) > 1: slots["song"] = target break if "song" in slots and slots["song"] in genre_keywords: slots["genre"] = slots.pop("song") return slots def _analyze_multi_intent( text: str, intents: list[IntentDefinition], clause_classifier: IntentClassifier | None = None, multi_intent_detector: MultiIntentDetector | None = None, joint_nlu: JointBertNLU | None = None, classifier_weight: float = 1.6, model_only_threshold: float = 0.62, ) -> MultiIntentParseResult: detector_result = multi_intent_detector.detect(text, intents) if multi_intent_detector is not None else None detector_prior = _build_detector_prior(detector_result) clauses = _split_clauses(text) parsed_clauses: list[ClauseParseResult] = [] for clause in clauses: parsed_clauses.extend( _parse_clause_candidates( clause, intents, full_text=text, clause_classifier=clause_classifier, detector_prior=detector_prior, joint_nlu=joint_nlu, classifier_weight=classifier_weight, model_only_threshold=model_only_threshold, ) ) matched_count = sum(1 for item in parsed_clauses if item.selected_intent_id is not None) has_multi_connector = len(clauses) >= 2 or _has_complex_pattern(text) workflow_type = "conditional" if _has_conditional_pattern(text) else "sequence" detected = matched_count >= 2 and has_multi_connector if matched_count < 2: workflow_type = "single" reason = ( f"detected {matched_count} executable clauses from multi-intent utterance" if detected else "did not detect multiple executable clauses" ) return MultiIntentParseResult( clauses=parsed_clauses, workflow_type=workflow_type, detected=detected, reason=reason, detector=detector_result, ) def _parse_clause_candidates( clause: str, intents: list[IntentDefinition], full_text: str | None = None, clause_classifier: IntentClassifier | None = None, detector_prior: dict[str, float] | None = None, joint_nlu: JointBertNLU | None = None, classifier_weight: float = 1.6, model_only_threshold: float = 0.62, ) -> list[ClauseParseResult]: cleaned_clause = clause.strip() if not cleaned_clause: return [ClauseParseResult(clause_text=clause)] scored: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]] = [] for intent in intents: score, reasons, signal_meta = _score_intent_for_clause(cleaned_clause, intent, full_text=full_text) if score > 0: scored.append((intent, score, reasons, signal_meta)) scored = _fuse_clause_scores( clause=cleaned_clause, intents=intents, heuristic_scored=scored, classifier_result=_predict_clause_with_classifier(cleaned_clause, intents, clause_classifier), detector_prior=detector_prior or {}, classifier_weight=classifier_weight, model_only_threshold=model_only_threshold, ) candidates = [ { "intent_id": intent.intent_id, "score": round(score, 4), "reason": "; ".join(reasons[:3]) if reasons else "heuristic clause score", "heuristic_score": round(float(signal_meta.get("heuristic_score", score)), 4), "model_score": round(float(signal_meta.get("model_score", 0.0)), 4), } for intent, score, reasons, signal_meta in scored[:5] ] if not scored or scored[0][1] < 1.1: return [ ClauseParseResult( clause_text=cleaned_clause, candidates=candidates, reason="no clause candidate scored above multi-intent threshold", ) ] parallel_candidates = _collect_parallel_clause_candidates(cleaned_clause, scored) if len(parallel_candidates) >= 2: return [ ClauseParseResult( clause_text=cleaned_clause, selected_intent_id=intent.intent_id, score=score, slots=_extract_slots_for_intent(cleaned_clause, intent.intent_id, joint_nlu=joint_nlu), reason="; ".join(reasons[:4]) if reasons else "heuristic clause match", candidates=candidates, ) for intent, score, reasons, _signal_meta in parallel_candidates ] intent, score, reasons, _signal_meta = scored[0] return [ ClauseParseResult( clause_text=cleaned_clause, selected_intent_id=intent.intent_id, score=score, slots=_extract_slots_for_intent(cleaned_clause, intent.intent_id, joint_nlu=joint_nlu), reason="; ".join(reasons[:4]) if reasons else "heuristic clause match", candidates=candidates, ) ] def _score_intent_for_clause( clause: str, intent: IntentDefinition, full_text: str | None = None, ) -> tuple[float, list[str], dict[str, Any]]: score = 0.0 reasons: list[str] = [] signal_meta = _collect_intent_signal_meta(clause, intent, full_text=full_text) for keyword in intent.keywords: if keyword and keyword in clause: score += 2.0 reasons.append(f"keyword:{keyword}") for example in intent.examples: if example and example in clause: score += 1.2 reasons.append(f"example:{example}") action_hits = signal_meta["action_hits"] object_hits = signal_meta["object_hits"] qualifier_hits = signal_meta["qualifier_hits"] if action_hits: score += 0.8 reasons.append(f"action:{action_hits[0]}") if object_hits: score += 0.8 reasons.append(f"object:{object_hits[0]}") if action_hits and object_hits: score += 0.9 reasons.append("action_object_pair") if qualifier_hits: score += 0.5 reasons.append(f"qualifier:{qualifier_hits[0]}") shared_context_hits = signal_meta["shared_context_hits"] if shared_context_hits and (action_hits or object_hits): score += 0.45 reasons.append(f"context:{shared_context_hits[0]}") if intent.intent_id == "cabin_set_ac" and re.search(r"\d{2}\s*度", clause): score += 1.1 reasons.append("explicit_temperature") signal_meta["explicit_slot"] = True if intent.intent_id in {"cs_query_order", "cs_query_logistics", "cs_cancel_order"} and re.search(r"\b[A-Za-z]\d{5,}\b", clause): score += 0.7 reasons.append("explicit_order_id") signal_meta["explicit_slot"] = True return score, reasons, signal_meta def _collect_intent_signal_meta( clause: str, intent: IntentDefinition, full_text: str | None = None, ) -> dict[str, Any]: hint_config = _INTENT_HINTS.get(intent.intent_id, {}) action_hits = [token for token in hint_config.get("actions", ()) if token in clause] object_hits = [token for token in hint_config.get("objects", ()) if token in clause] qualifier_hits = [token for token in hint_config.get("qualifiers", ()) if token in clause] shared_context_hits: list[str] = [] if full_text and full_text != clause: shared_context_hits = [ token for token in hint_config.get("shared_context", ()) if token in full_text and token not in clause ] return { "action_hits": action_hits, "object_hits": object_hits, "qualifier_hits": qualifier_hits, "shared_context_hits": shared_context_hits, "explicit_slot": False, "family": str(hint_config.get("family") or intent.intent_id), } def _predict_clause_with_classifier( clause: str, intents: list[IntentDefinition], clause_classifier: IntentClassifier | None, ) -> ClassificationResult | None: if clause_classifier is None: return None try: return clause_classifier.predict(clause, intents) except Exception: return None def _fuse_clause_scores( clause: str, intents: list[IntentDefinition], heuristic_scored: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]], classifier_result: ClassificationResult | None, detector_prior: dict[str, float], classifier_weight: float, model_only_threshold: float, ) -> list[tuple[IntentDefinition, float, list[str], dict[str, Any]]]: intent_map = {intent.intent_id: intent for intent in intents} classifier_candidates = _extract_classifier_candidates(classifier_result, intent_map) merged: dict[str, dict[str, Any]] = {} for intent, score, reasons, signal_meta in heuristic_scored: merged[intent.intent_id] = { "intent": intent, "heuristic_score": score, "model_score": 0.0, "reasons": list(reasons), "signal_meta": dict(signal_meta), } for item in classifier_candidates: intent = item["intent"] if intent is None: continue bucket = merged.setdefault( intent.intent_id, { "intent": intent, "heuristic_score": 0.0, "model_score": 0.0, "reasons": [], "signal_meta": _collect_intent_signal_meta(clause, intent), }, ) bucket["model_score"] = max(bucket["model_score"], item["score"]) bucket["reasons"].append(f"bert:{item['score']:.3f}") fused: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]] = [] for item in merged.values(): heuristic_score = float(item["heuristic_score"]) model_score = float(item["model_score"]) detector_score = float(detector_prior.get(item["intent"].intent_id, 0.0)) if heuristic_score <= 0 and not _allow_model_bootstrap( clause, model_score, classifier_candidates, model_only_threshold, ): continue fused_score = heuristic_score + model_score * classifier_weight if detector_score > 0: fused_score += detector_score * 0.55 item["reasons"].append(f"multi:{detector_score:.3f}") if heuristic_score > 0 and model_score > 0: fused_score += 0.25 item["reasons"].append("heuristic_bert_agree") signal_meta = dict(item["signal_meta"]) signal_meta["heuristic_score"] = heuristic_score signal_meta["model_score"] = model_score signal_meta["detector_score"] = detector_score if heuristic_score <= 0 and model_score >= model_only_threshold: item["reasons"].append("bert_bootstrap") fused.append((item["intent"], fused_score, item["reasons"], signal_meta)) fused.sort(key=lambda row: row[1], reverse=True) return fused def _build_detector_prior(detector_result: MultiIntentDetectionResult | None) -> dict[str, float]: if detector_result is None: return {} prior: dict[str, float] = {} for candidate in detector_result.candidates: if candidate.score <= 0: continue prior[candidate.intent_id] = candidate.score return prior def _extract_classifier_candidates( classifier_result: ClassificationResult | None, intent_map: dict[str, IntentDefinition], ) -> list[dict[str, Any]]: if classifier_result is None: return [] normalized: list[dict[str, Any]] = [] for item in classifier_result.raw_candidates or []: if not isinstance(item, dict): continue intent_id = str(item.get("intent_id") or "") intent = intent_map.get(intent_id) if intent is None: continue normalized.append( { "intent": intent, "intent_id": intent_id, "score": float(item.get("score", 0.0)), "label": str(item.get("label") or intent_id), } ) if normalized: return normalized if classifier_result.candidates: return [ { "intent": intent, "intent_id": intent.intent_id, "score": float(score), "label": intent.intent_id, } for intent, score in classifier_result.candidates ] return [] def _allow_model_bootstrap( clause: str, model_score: float, classifier_candidates: list[dict[str, Any]], model_only_threshold: float, ) -> bool: if model_score < model_only_threshold: return False compact_clause = re.sub(r"\s+", "", clause) if len(compact_clause) < 4 and not re.search(r"\d{2}\s*度|\b[A-Za-z]\d{5,}\b", clause): return False runner_up_score = 0.0 if len(classifier_candidates) >= 2: runner_up_score = float(classifier_candidates[1].get("score", 0.0)) return model_score - runner_up_score >= 0.18 def _collect_parallel_clause_candidates( clause: str, scored: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]], ) -> list[tuple[IntentDefinition, float, list[str], dict[str, Any]]]: if not _looks_like_parallel_clause(clause): return [] selected: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]] = [] used_families: set[str] = set() strong_candidates = [item for item in scored if item[1] >= 1.35] for intent, score, reasons, signal_meta in strong_candidates: family = str(signal_meta.get("family") or intent.intent_id) if family in used_families: continue has_strong_anchor = bool(signal_meta["object_hits"] or signal_meta["explicit_slot"] or any(reason.startswith("keyword:") for reason in reasons)) if not has_strong_anchor: continue selected.append((intent, score, reasons, signal_meta)) used_families.add(family) if len(selected) >= 4: break if len(selected) < 2: return [] selected.sort(key=lambda item: (_signal_position(clause, item[3]), -item[1])) return selected def _signal_position(clause: str, signal_meta: dict[str, Any]) -> int: candidates = [] for token in list(signal_meta.get("object_hits") or []) + list(signal_meta.get("action_hits") or []): if not token: continue position = clause.find(token) if position >= 0: candidates.append(position) if not candidates: return len(clause) + 1 return min(candidates) def _split_clauses(text: str) -> list[str]: clauses = re_split(text) return [item for item in clauses if item] def _has_complex_pattern(text: str) -> bool: patterns = ("然后", "再", "并且", "并", "同时", "如果", "若", "先", "后", "顺便", "接着", "以及", "和") return any(token in text for token in patterns) def _has_conditional_pattern(text: str) -> bool: return any(token in text for token in ("如果", "若", "还没", "未发货", "没发货")) def _apply_conditional_hints(text: str, steps: list[PlannedStep]) -> list[PlannedStep]: if len(steps) < 2: return steps has_unshipped_condition = any(token in text for token in ("还没发货", "未发货", "没发货")) query_index = next((index for index, step in enumerate(steps) if step.intent_id == "cs_query_order"), None) cancel_index = next((index for index, step in enumerate(steps) if step.intent_id == "cs_cancel_order"), None) if has_unshipped_condition and query_index is not None and cancel_index is not None and cancel_index > query_index: cancel_step = steps[cancel_index] if not cancel_step.depends_on: cancel_step.depends_on = [query_index + 1] if not cancel_step.condition: cancel_step.condition = { "source_step": query_index + 1, "field": "order_status", "operator": "equals", "value": "pending_shipment", "description": "仅在订单未发货时取消", } cancel_step.requires_confirmation = True return steps def re_split(text: str) -> list[str]: chunks = re.split(r"(?:然后|并且|同时|再|顺便|接着|并|,|,|;|;)", text) return [chunk.strip() for chunk in chunks if chunk.strip()] def _looks_like_parallel_clause(text: str) -> bool: return any(token in text for token in ("和", "以及", "并", "同时", "还有")) def _serialize_clause_analysis(parse_result: MultiIntentParseResult) -> list[dict[str, Any]]: return [ { "clause_text": clause.clause_text, "selected_intent_id": clause.selected_intent_id, "score": round(clause.score, 4), "slots": clause.slots, "reason": clause.reason, "candidates": clause.candidates, } for clause in parse_result.clauses ] def _serialize_multi_intent_detector( detector_result: MultiIntentDetectionResult | None, ) -> dict[str, Any] | None: if detector_result is None: return None return { "detected": detector_result.detected, "reason": detector_result.reason, "backend_name": detector_result.backend_name, "error_message": detector_result.error_message, "candidates": [ { "intent_id": candidate.intent_id, "score": round(candidate.score, 4), "label": candidate.label, } for candidate in detector_result.candidates ], "raw_scores": [ { "intent_id": str(item.get("intent_id") or ""), "label": str(item.get("label") or ""), "score": round(float(item.get("score", 0.0)), 4), } for item in detector_result.raw_scores[:8] ], } _INTENT_HINTS: dict[str, dict[str, Any]] = { "cabin_ac_on": { "family": "ac_power", "actions": ("打开", "开启", "启动", "开"), "objects": ("空调", "ac", "冷气", "冷风"), }, "cabin_ac_off": { "family": "ac_power", "actions": ("关闭", "关掉", "关", "停掉"), "objects": ("空调", "ac", "冷气", "冷风"), }, "cabin_set_ac": { "family": "ac_temperature", "actions": ("调到", "设到", "设成", "调低", "调高", "升高", "降低"), "objects": ("空调", "温度", "度", "冷气"), "qualifiers": ("冷一点", "热一点"), }, "cabin_fan_up": { "family": "fan_speed", "actions": ("调大", "调高", "加大"), "objects": ("风量", "风"), }, "cabin_fan_down": { "family": "fan_speed", "actions": ("调小", "调低", "减小"), "objects": ("风量", "风"), }, "cabin_defog_front_on": { "family": "defog_front", "actions": ("打开", "开启", "开", "除", "除雾"), "objects": ("前挡", "前挡风", "前窗", "前玻璃"), }, "cabin_defog_rear_on": { "family": "defog_rear", "actions": ("打开", "开启", "开", "除", "除雾"), "objects": ("后挡", "后挡风", "后窗", "后玻璃"), }, "cabin_window_open": { "family": "window", "actions": ("打开", "开", "升起来"), "objects": ("车窗", "窗户", "窗"), }, "cabin_window_close": { "family": "window", "actions": ("关闭", "关上", "关掉", "降下来", "降下", "关"), "objects": ("车窗", "窗户", "窗"), }, "cabin_sunroof_open": { "family": "sunroof", "actions": ("打开", "开", "翘起", "翘起来"), "objects": ("天窗",), }, "cabin_sunroof_close": { "family": "sunroof", "actions": ("关闭", "关上", "合上", "关掉", "合起来"), "objects": ("天窗",), }, "cabin_lock_doors": { "family": "door_lock", "actions": ("锁上", "锁住", "上锁", "锁"), "objects": ("车门", "门", "车锁"), }, "cabin_unlock_doors": { "family": "door_lock", "actions": ("解锁", "开锁"), "objects": ("车门", "门", "车锁"), }, "cabin_nav_to": { "family": "nav", "actions": ("导航", "去", "到", "带我去"), "objects": ("公司", "机场", "家", "目的地"), }, "cabin_nav_cancel": { "family": "nav", "actions": ("取消", "结束", "停止", "退出", "关掉"), "objects": ("导航", "路线"), }, "cabin_play_music": { "family": "music_playback", "actions": ("播放", "来点", "放点", "听", "来首", "来一首"), "objects": ("音乐", "歌", "轻音乐", "摇滚", "流行"), }, "cabin_pause_music": { "family": "music_playback", "actions": ("暂停", "停掉", "停止"), "objects": ("音乐", "歌", "播放"), }, "cabin_next_track": { "family": "track_switch", "actions": ("下一首", "切到下一首", "切歌", "换一首"), "objects": ("歌", "歌曲", "音乐"), }, "cabin_previous_track": { "family": "track_switch", "actions": ("上一首", "切回上一首", "回到上一首"), "objects": ("歌", "歌曲", "音乐"), }, "cabin_volume_up": { "family": "volume", "actions": ("调大", "加大", "开大", "调高"), "objects": ("音量", "声音", "媒体音量"), }, "cabin_volume_down": { "family": "volume", "actions": ("调小", "压低", "关小", "调低"), "objects": ("音量", "声音", "媒体音量"), }, "cabin_volume_mute": { "family": "volume", "actions": ("静音", "关掉", "关闭"), "objects": ("音量", "声音", "音响"), }, "cabin_lights_on": { "family": "lights", "actions": ("打开", "开", "点亮"), "objects": ("灯", "车灯", "大灯"), }, "cabin_lights_off": { "family": "lights", "actions": ("关闭", "关掉", "关", "熄了"), "objects": ("灯", "车灯", "大灯"), }, "cabin_seat_heat_on": { "family": "seat_heat", "actions": ("打开", "开启", "开"), "objects": ("座椅加热",), }, "cabin_seat_heat_off": { "family": "seat_heat", "actions": ("关闭", "关掉", "关"), "objects": ("座椅加热",), }, "cabin_mirror_fold": { "family": "mirror", "actions": ("折叠", "收起"), "objects": ("后视镜",), }, "cabin_mirror_unfold": { "family": "mirror", "actions": ("展开", "打开"), "objects": ("后视镜",), }, "cabin_wiper_on": { "family": "wiper", "actions": ("打开", "启动", "开始"), "objects": ("雨刷", "雨刮", "雨刮器"), }, "cabin_wiper_off": { "family": "wiper", "actions": ("关闭", "停掉", "停止"), "objects": ("雨刷", "雨刮", "雨刮器"), }, "cs_query_order": { "family": "order_query", "actions": ("查", "查询", "看看"), "objects": ("订单",), "shared_context": ("订单",), }, "cs_query_logistics": { "family": "order_logistics", "actions": ("查", "查询", "看看"), "objects": ("物流", "快递"), "shared_context": ("物流", "快递", "订单"), }, "cs_cancel_order": { "family": "order_cancel", "actions": ("取消", "撤销", "撤单", "不要了"), "objects": ("订单",), "shared_context": ("订单",), }, }