Files
2026-06-11 16:28:00 +08:00

1348 lines
52 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import re
from dataclasses import dataclass, field
from typing import Any, Protocol
from urllib import error, request
from app.services.classifier import ClassificationResult, IntentClassifier
from app.services.joint_nlu import JointBertNLU
from app.services.multi_intent_detector import MultiIntentDetectionResult, MultiIntentDetector
from app.schemas.configuration import WorkflowTemplateDefinition, WorkflowTemplatesConfig
from app.schemas.intent import IntentDefinition
@dataclass
class PlannedStep:
intent_id: str
slots: dict[str, Any] = field(default_factory=dict)
depends_on: list[int] = field(default_factory=list)
condition: dict[str, Any] = field(default_factory=dict)
requires_confirmation: bool = False
reason: str | None = None
@dataclass
class PlanningResult:
accepted: bool
workflow_type: str = "single"
steps: list[PlannedStep] = field(default_factory=list)
model_name: str = "heuristic-planner"
backend: str = "local"
reason: str | None = None
error_message: str | None = None
raw_response: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class ClauseParseResult:
clause_text: str
selected_intent_id: str | None = None
score: float = 0.0
slots: dict[str, Any] = field(default_factory=dict)
reason: str | None = None
candidates: list[dict[str, Any]] = field(default_factory=list)
@dataclass
class MultiIntentParseResult:
clauses: list[ClauseParseResult] = field(default_factory=list)
workflow_type: str = "single"
detected: bool = False
reason: str | None = None
detector: MultiIntentDetectionResult | None = None
@property
def matched_ids(self) -> list[str]:
return [clause.selected_intent_id for clause in self.clauses if clause.selected_intent_id]
@property
def matched_clauses(self) -> list[ClauseParseResult]:
return [clause for clause in self.clauses if clause.selected_intent_id]
class WorkflowPlanner(Protocol):
def plan(
self,
text: str,
intents: list[IntentDefinition],
context: dict[str, Any] | None = None,
) -> PlanningResult:
...
class TemplateWorkflowPlanner:
def __init__(
self,
templates: WorkflowTemplatesConfig | None = None,
clause_classifier: IntentClassifier | None = None,
multi_intent_detector: MultiIntentDetector | None = None,
joint_nlu: JointBertNLU | None = None,
classifier_weight: float = 1.6,
model_only_threshold: float = 0.62,
) -> None:
self._templates = templates or WorkflowTemplatesConfig()
self._clause_classifier = clause_classifier
self._multi_intent_detector = multi_intent_detector
self._joint_nlu = joint_nlu
self._classifier_weight = classifier_weight
self._model_only_threshold = model_only_threshold
def plan(
self,
text: str,
intents: list[IntentDefinition],
context: dict[str, Any] | None = None,
) -> PlanningResult:
_ = context
if not self._templates.templates:
return PlanningResult(
accepted=False,
model_name="template-planner",
backend="local-template",
reason="no workflow templates configured",
)
parse_result = _analyze_multi_intent(
text,
intents,
clause_classifier=self._clause_classifier,
multi_intent_detector=self._multi_intent_detector,
joint_nlu=self._joint_nlu,
classifier_weight=self._classifier_weight,
model_only_threshold=self._model_only_threshold,
)
matched_ids = parse_result.matched_ids
if len(matched_ids) < 2:
return PlanningResult(
accepted=False,
model_name="template-planner",
backend="local-template",
reason="not enough matched clauses for workflow template",
metadata={
"matched_intents": matched_ids,
"input_clauses": [item.clause_text for item in parse_result.clauses],
"clause_analysis": _serialize_clause_analysis(parse_result),
"multi_intent_detected": parse_result.detected,
"multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector),
},
)
for template in self._templates.templates:
if self._matches_template(text, matched_ids, template):
steps = self._build_steps(template, parse_result)
return PlanningResult(
accepted=True,
workflow_type=template.workflow_type,
steps=steps,
model_name="template-planner",
backend="local-template",
reason=f"matched workflow template: {template.template_id}",
metadata={
"template_id": template.template_id,
"input_clauses": [item.clause_text for item in parse_result.clauses],
"matched_intents": matched_ids,
"clause_analysis": _serialize_clause_analysis(parse_result),
"multi_intent_detected": parse_result.detected,
"multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector),
"normalized_steps": [
{
"intent_id": step.intent_id,
"slots": step.slots,
"depends_on": step.depends_on,
"condition": step.condition,
"requires_confirmation": step.requires_confirmation,
}
for step in steps
],
},
)
return PlanningResult(
accepted=False,
model_name="template-planner",
backend="local-template",
reason="no workflow template matched the current clause intents",
metadata={
"matched_intents": matched_ids,
"input_clauses": [item.clause_text for item in parse_result.clauses],
"clause_analysis": _serialize_clause_analysis(parse_result),
"multi_intent_detected": parse_result.detected,
"multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector),
},
)
def _matches_template(self, text: str, matched_ids: list[str], template: WorkflowTemplateDefinition) -> bool:
if template.trigger_keywords and not all(keyword in text for keyword in template.trigger_keywords):
return False
return matched_ids[: len(template.intent_sequence)] == template.intent_sequence
def _build_steps(
self,
template: WorkflowTemplateDefinition,
parse_result: MultiIntentParseResult,
) -> list[PlannedStep]:
steps: list[PlannedStep] = []
matched_clauses = parse_result.matched_clauses
for index, intent_id in enumerate(template.intent_sequence):
clause_result = matched_clauses[index] if index < len(matched_clauses) else None
clause_text = clause_result.clause_text if clause_result is not None else ""
override = template.step_overrides[index] if index < len(template.step_overrides) else None
steps.append(
PlannedStep(
intent_id=intent_id,
slots=_extract_slots_for_intent(
clause_text or " ".join(item.clause_text for item in parse_result.clauses),
intent_id,
joint_nlu=self._joint_nlu,
),
depends_on=list(override.depends_on) if override is not None else [],
condition=dict(override.condition) if override is not None else {},
requires_confirmation=bool(override.requires_confirmation) if override is not None else False,
reason=f"workflow template matched clause {index + 1}: {clause_result.selected_intent_id if clause_result is not None else intent_id}",
)
)
return steps
class CompositeWorkflowPlanner:
def __init__(self, planners: list[WorkflowPlanner]) -> None:
self._planners = planners
def plan(
self,
text: str,
intents: list[IntentDefinition],
context: dict[str, Any] | None = None,
) -> PlanningResult:
last_result = PlanningResult(accepted=False, reason="no planner configured")
for planner in self._planners:
result = planner.plan(text, intents, context)
last_result = result
if result.accepted:
return result
return last_result
class HeuristicWorkflowPlanner:
def __init__(
self,
clause_classifier: IntentClassifier | None = None,
multi_intent_detector: MultiIntentDetector | None = None,
joint_nlu: JointBertNLU | None = None,
classifier_weight: float = 1.6,
model_only_threshold: float = 0.62,
) -> None:
self._clause_classifier = clause_classifier
self._multi_intent_detector = multi_intent_detector
self._joint_nlu = joint_nlu
self._classifier_weight = classifier_weight
self._model_only_threshold = model_only_threshold
def plan(
self,
text: str,
intents: list[IntentDefinition],
context: dict[str, Any] | None = None,
) -> PlanningResult:
_ = context
parse_result = _analyze_multi_intent(
text,
intents,
clause_classifier=self._clause_classifier,
multi_intent_detector=self._multi_intent_detector,
joint_nlu=self._joint_nlu,
classifier_weight=self._classifier_weight,
model_only_threshold=self._model_only_threshold,
)
if not parse_result.detected and not _has_complex_pattern(text):
return PlanningResult(
accepted=False,
model_name="heuristic-planner",
backend="local-heuristic",
reason="single command or no explicit planning pattern detected",
metadata={
"input_clauses": [item.clause_text for item in parse_result.clauses],
"clause_analysis": _serialize_clause_analysis(parse_result),
"multi_intent_detected": parse_result.detected,
"multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector),
},
)
steps: list[PlannedStep] = []
for clause in parse_result.matched_clauses:
if clause.selected_intent_id is None:
continue
steps.append(
PlannedStep(
intent_id=clause.selected_intent_id,
slots=clause.slots.copy(),
reason=clause.reason or f"heuristic parse for clause: {clause.clause_text}",
)
)
workflow_type = parse_result.workflow_type
if workflow_type == "conditional":
steps = _apply_conditional_hints(text, steps)
if len(steps) >= 2:
return PlanningResult(
accepted=True,
workflow_type=workflow_type,
steps=steps,
model_name="heuristic-planner",
backend="local-heuristic",
reason=parse_result.reason or "heuristic planner split the request into multiple executable clauses",
metadata={
"input_clauses": [item.clause_text for item in parse_result.clauses],
"clause_analysis": _serialize_clause_analysis(parse_result),
"multi_intent_detected": parse_result.detected,
"multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector),
"normalized_steps": [
{
"intent_id": step.intent_id,
"slots": step.slots,
"depends_on": step.depends_on,
"condition": step.condition,
"requires_confirmation": step.requires_confirmation,
"reason": step.reason,
}
for step in steps
],
},
)
return PlanningResult(
accepted=False,
workflow_type=workflow_type,
steps=steps,
model_name="heuristic-planner",
backend="local-heuristic",
reason="planning detected but local heuristic could not produce enough executable steps",
metadata={
"input_clauses": [item.clause_text for item in parse_result.clauses],
"clause_analysis": _serialize_clause_analysis(parse_result),
"multi_intent_detected": parse_result.detected,
"multi_intent_detector": _serialize_multi_intent_detector(parse_result.detector),
},
)
class DashScopeWorkflowPlanner:
def __init__(
self,
base_url: str,
api_key: str,
model_name: str,
timeout_seconds: float = 6.0,
fallback: WorkflowPlanner | None = None,
joint_nlu: JointBertNLU | None = None,
) -> None:
self._base_url = base_url.rstrip("/")
self._api_key = api_key
self._model_name = model_name
self._timeout_seconds = timeout_seconds
self._fallback = fallback or HeuristicWorkflowPlanner(joint_nlu=joint_nlu)
self._joint_nlu = joint_nlu
def plan(
self,
text: str,
intents: list[IntentDefinition],
context: dict[str, Any] | None = None,
) -> PlanningResult:
if not self._base_url or not self._api_key or not self._model_name:
return self._fallback_result(
text,
intents,
context,
reason="planner is not configured",
error_message="set AGENT_PLANNER_BASE_URL / AGENT_PLANNER_API_KEY / AGENT_PLANNER_MODEL_NAME",
)
payload = {
"model": self._model_name,
"temperature": 0.1,
"enable_thinking": False,
"max_tokens": 320,
"messages": [
{
"role": "system",
"content": self._system_prompt(intents),
},
{
"role": "user",
"content": json.dumps(
{
"text": text,
"clauses": _split_clauses(text),
"context": context or {},
},
ensure_ascii=False,
),
},
],
}
req = request.Request(
self._endpoint(),
data=json.dumps(payload).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self._api_key}",
},
method="POST",
)
try:
with request.urlopen(req, timeout=self._timeout_seconds) as response:
data = json.loads(response.read().decode("utf-8"))
except (error.URLError, TimeoutError, ValueError) as exc:
return self._fallback_result(
text,
intents,
context,
reason="cloud planner request failed",
error_message=str(exc),
)
content = self._extract_content(data)
if not content:
return self._fallback_result(
text,
intents,
context,
reason="cloud planner returned empty content",
raw_response=json.dumps(data, ensure_ascii=False),
)
try:
plan_data = self._parse_json_content(content)
except ValueError as exc:
return self._fallback_result(
text,
intents,
context,
reason="cloud planner returned invalid json",
error_message=str(exc),
raw_response=content,
)
steps, normalization_meta = self._normalize_steps(plan_data, text, intents)
accepted = bool(plan_data.get("accepted", bool(steps)))
workflow_type = str(plan_data.get("workflow_type", "single" if len(steps) <= 1 else "sequence"))
return PlanningResult(
accepted=accepted,
workflow_type=workflow_type,
steps=steps,
model_name=self._model_name,
backend="dashscope",
reason=str(plan_data.get("reason", "")).strip() or "cloud planner parsed successfully",
raw_response=content,
metadata={
"input_clauses": _split_clauses(text),
"parsed_plan": plan_data,
"normalized_steps": normalization_meta,
},
)
def _endpoint(self) -> str:
if self._base_url.endswith("/chat/completions"):
return self._base_url
return f"{self._base_url}/chat/completions"
def _system_prompt(self, intents: list[IntentDefinition]) -> str:
intent_catalog = [
{
"intent_id": intent.intent_id,
"required_slots": intent.required_slots,
"keywords": intent.keywords[:4],
}
for intent in intents
]
return (
"Plan user requests into executable steps for a cabin/service assistant. "
"Return strict JSON only, no markdown. "
"Use only intent_id values from the catalog. "
"If the request is a single clear command, return one step. "
"If it contains multiple commands, return sequence steps. "
"Fill slots whenever the value is explicit in the user text. "
"Examples: order_id like A123456, destination like 公司/机场, temperature as integer, music may use song or genre such as 轻音乐. "
"Each step should only contain slots relevant to that intent. "
"For conditional requests, mark workflow_type as conditional and attach depends_on plus condition. "
"Use requires_confirmation=true for risky actions such as canceling an order. "
'Schema: {"accepted":true,"workflow_type":"single|sequence|conditional","reason":"brief","steps":[{"intent_id":"...","slots":{},"depends_on":[1],"condition":{"field":"order_status","operator":"equals","value":"pending_shipment","description":"only cancel if not shipped"},"requires_confirmation":true,"reason":"brief"}]}. '
f"Catalog={json.dumps(intent_catalog, ensure_ascii=False, separators=(',', ':'))}"
)
def _extract_content(self, data: dict[str, Any]) -> str:
choices = data.get("choices")
if not isinstance(choices, list) or not choices:
return ""
message = choices[0].get("message", {})
content = message.get("content", "")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
text_parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text_parts.append(str(item.get("text", "")))
return "".join(text_parts).strip()
return ""
def _parse_json_content(self, content: str) -> dict[str, Any]:
cleaned = content.strip()
if cleaned.startswith("```"):
cleaned = cleaned.strip("`")
if cleaned.startswith("json"):
cleaned = cleaned[4:].strip()
start = cleaned.find("{")
end = cleaned.rfind("}")
if start < 0 or end < 0 or end <= start:
raise ValueError("planner content does not contain a JSON object")
return json.loads(cleaned[start : end + 1])
def _normalize_steps(
self,
plan_data: dict[str, Any],
text: str,
intents: list[IntentDefinition],
) -> tuple[list[PlannedStep], list[dict[str, Any]]]:
intent_map = {intent.intent_id: intent for intent in intents}
clauses = _split_clauses(text)
raw_steps = plan_data.get("steps", [])
steps: list[PlannedStep] = []
normalization_meta: list[dict[str, Any]] = []
if not isinstance(raw_steps, list):
return steps, normalization_meta
for index, raw_step in enumerate(raw_steps):
if not isinstance(raw_step, dict):
continue
intent_id = str(raw_step.get("intent_id", "")).strip()
if not intent_id or intent_id not in intent_map:
continue
cloud_slots = raw_step.get("slots", {})
if not isinstance(cloud_slots, dict):
cloud_slots = {}
clause_text = clauses[index] if index < len(clauses) else text
clause_slots = _extract_slots_for_intent(clause_text, intent_id, joint_nlu=self._joint_nlu)
full_text_slots = _extract_slots_for_intent(text, intent_id, joint_nlu=self._joint_nlu)
normalized_slots = self._merge_slots(intent_id, cloud_slots, clause_slots, full_text_slots)
steps.append(
PlannedStep(
intent_id=intent_id,
slots=normalized_slots,
depends_on=self._normalize_depends_on(raw_step.get("depends_on")),
condition=self._normalize_condition(intent_id, raw_step.get("condition")),
requires_confirmation=bool(raw_step.get("requires_confirmation", False)),
reason=str(raw_step.get("reason", "")).strip() or None,
)
)
normalization_meta.append(
{
"intent_id": intent_id,
"clause_text": clause_text,
"cloud_slots": cloud_slots,
"clause_slots": clause_slots,
"full_text_slots": full_text_slots,
"normalized_slots": normalized_slots,
"depends_on": steps[-1].depends_on,
"condition": steps[-1].condition,
"requires_confirmation": steps[-1].requires_confirmation,
}
)
inferred_type = str(plan_data.get("workflow_type", ""))
if inferred_type == "conditional" or _has_conditional_pattern(text):
steps = _apply_conditional_hints(text, steps)
for index, step in enumerate(steps):
if index < len(normalization_meta):
normalization_meta[index]["depends_on"] = step.depends_on
normalization_meta[index]["condition"] = step.condition
normalization_meta[index]["requires_confirmation"] = step.requires_confirmation
return steps, normalization_meta
def _merge_slots(
self,
intent_id: str,
cloud_slots: dict[str, Any],
clause_slots: dict[str, Any],
full_text_slots: dict[str, Any],
) -> dict[str, Any]:
merged: dict[str, Any] = {}
for source in (full_text_slots, clause_slots, cloud_slots):
for key, value in source.items():
if value in ("", None, []):
continue
merged[key] = value
allowed_slot_keys = {
"cs_query_order": {"order_id"},
"cs_query_logistics": {"order_id"},
"cs_cancel_order": {"order_id"},
"cabin_nav_to": {"destination"},
"cabin_set_ac": {"temperature"},
"cabin_play_music": {"song", "genre"},
}.get(intent_id)
if allowed_slot_keys is None:
return merged
return {key: value for key, value in merged.items() if key in allowed_slot_keys}
def _normalize_depends_on(self, raw_value: Any) -> list[int]:
if not isinstance(raw_value, list):
return []
result: list[int] = []
for item in raw_value:
try:
index = int(item)
except (TypeError, ValueError):
continue
if index > 0:
result.append(index)
return result
def _normalize_condition(self, intent_id: str, raw_value: Any) -> dict[str, Any]:
if not isinstance(raw_value, dict):
return {}
normalized: dict[str, Any] = {}
for key in ("field", "operator", "value", "description"):
if key in raw_value and raw_value[key] not in ("", None):
normalized[key] = raw_value[key]
source_step = raw_value.get("source_step")
try:
if source_step is not None and int(source_step) > 0:
normalized["source_step"] = int(source_step)
except (TypeError, ValueError):
pass
if (
intent_id == "cs_cancel_order"
and normalized.get("field") == "order_status"
and normalized.get("value") == "pending_shipment"
):
normalized["description"] = "仅在订单未发货时取消"
return normalized
def _fallback_result(
self,
text: str,
intents: list[IntentDefinition],
context: dict[str, Any] | None,
reason: str,
error_message: str | None = None,
raw_response: str | None = None,
) -> PlanningResult:
fallback = self._fallback.plan(text, intents, context)
fallback.reason = reason
fallback.error_message = error_message
fallback.raw_response = raw_response
fallback.backend = "dashscope-fallback"
return fallback
def _extract_slots_for_intent(
text: str,
intent_id: str,
joint_nlu: JointBertNLU | None = None,
) -> dict[str, Any]:
if joint_nlu is not None:
predicted = joint_nlu.extract_slots_by_intent_id(text, intent_id)
if predicted:
return predicted
slots: dict[str, Any] = {}
cleaned_text = text.strip()
if not cleaned_text:
return slots
order_id_match = re.search(r"\b[A-Za-z]\d{5,}\b", cleaned_text)
if order_id_match:
slots["order_id"] = order_id_match.group(0)
temperature_match = re.search(r"(\d{2})\s*度", cleaned_text)
if temperature_match:
slots["temperature"] = int(temperature_match.group(1))
if intent_id == "cabin_nav_to":
destination = _extract_destination(cleaned_text)
if destination:
slots["destination"] = destination
if intent_id == "cabin_play_music":
music_slots = _extract_music_slots(cleaned_text)
slots.update(music_slots)
return slots
def _extract_destination(text: str) -> str | None:
for pattern in (
r"导航去(?P<destination>.+)",
r"导航到(?P<destination>.+)",
r"带我去(?P<destination>.+)",
r"去(?P<destination>.+)",
r"到(?P<destination>.+)",
):
match = re.search(pattern, text)
if match:
destination = re.split(r"(?:然后|并且|同时|再|,|||;)", match.group("destination"), maxsplit=1)[0]
destination = destination.strip(" ,。")
if destination:
return destination
return None
def _extract_music_slots(text: str) -> dict[str, Any]:
slots: dict[str, Any] = {}
genre_keywords = ("轻音乐", "摇滚", "古典", "民谣", "爵士", "流行", "儿歌")
for genre in genre_keywords:
if genre in text:
slots["genre"] = genre
break
for trigger in ("播放", "来点", "放点", "", "来首", "来一首"):
if trigger in text:
raw_target = text.split(trigger, maxsplit=1)[-1]
raw_target = re.split(r"(?:然后|并且|同时|再|,|||;)", raw_target, maxsplit=1)[0].strip(" ,。")
matched_genre = next((genre for genre in genre_keywords if genre in raw_target), None)
if matched_genre:
slots["genre"] = matched_genre
break
target = raw_target.strip(" 的一首首个歌曲音乐吧呀啊,。")
if target and target not in {"", "音乐"} and len(target) > 1:
slots["song"] = target
break
if "song" in slots and slots["song"] in genre_keywords:
slots["genre"] = slots.pop("song")
return slots
def _analyze_multi_intent(
text: str,
intents: list[IntentDefinition],
clause_classifier: IntentClassifier | None = None,
multi_intent_detector: MultiIntentDetector | None = None,
joint_nlu: JointBertNLU | None = None,
classifier_weight: float = 1.6,
model_only_threshold: float = 0.62,
) -> MultiIntentParseResult:
detector_result = multi_intent_detector.detect(text, intents) if multi_intent_detector is not None else None
detector_prior = _build_detector_prior(detector_result)
clauses = _split_clauses(text)
parsed_clauses: list[ClauseParseResult] = []
for clause in clauses:
parsed_clauses.extend(
_parse_clause_candidates(
clause,
intents,
full_text=text,
clause_classifier=clause_classifier,
detector_prior=detector_prior,
joint_nlu=joint_nlu,
classifier_weight=classifier_weight,
model_only_threshold=model_only_threshold,
)
)
matched_count = sum(1 for item in parsed_clauses if item.selected_intent_id is not None)
has_multi_connector = len(clauses) >= 2 or _has_complex_pattern(text)
workflow_type = "conditional" if _has_conditional_pattern(text) else "sequence"
detected = matched_count >= 2 and has_multi_connector
if matched_count < 2:
workflow_type = "single"
reason = (
f"detected {matched_count} executable clauses from multi-intent utterance"
if detected
else "did not detect multiple executable clauses"
)
return MultiIntentParseResult(
clauses=parsed_clauses,
workflow_type=workflow_type,
detected=detected,
reason=reason,
detector=detector_result,
)
def _parse_clause_candidates(
clause: str,
intents: list[IntentDefinition],
full_text: str | None = None,
clause_classifier: IntentClassifier | None = None,
detector_prior: dict[str, float] | None = None,
joint_nlu: JointBertNLU | None = None,
classifier_weight: float = 1.6,
model_only_threshold: float = 0.62,
) -> list[ClauseParseResult]:
cleaned_clause = clause.strip()
if not cleaned_clause:
return [ClauseParseResult(clause_text=clause)]
scored: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]] = []
for intent in intents:
score, reasons, signal_meta = _score_intent_for_clause(cleaned_clause, intent, full_text=full_text)
if score > 0:
scored.append((intent, score, reasons, signal_meta))
scored = _fuse_clause_scores(
clause=cleaned_clause,
intents=intents,
heuristic_scored=scored,
classifier_result=_predict_clause_with_classifier(cleaned_clause, intents, clause_classifier),
detector_prior=detector_prior or {},
classifier_weight=classifier_weight,
model_only_threshold=model_only_threshold,
)
candidates = [
{
"intent_id": intent.intent_id,
"score": round(score, 4),
"reason": "; ".join(reasons[:3]) if reasons else "heuristic clause score",
"heuristic_score": round(float(signal_meta.get("heuristic_score", score)), 4),
"model_score": round(float(signal_meta.get("model_score", 0.0)), 4),
}
for intent, score, reasons, signal_meta in scored[:5]
]
if not scored or scored[0][1] < 1.1:
return [
ClauseParseResult(
clause_text=cleaned_clause,
candidates=candidates,
reason="no clause candidate scored above multi-intent threshold",
)
]
parallel_candidates = _collect_parallel_clause_candidates(cleaned_clause, scored)
if len(parallel_candidates) >= 2:
return [
ClauseParseResult(
clause_text=cleaned_clause,
selected_intent_id=intent.intent_id,
score=score,
slots=_extract_slots_for_intent(cleaned_clause, intent.intent_id, joint_nlu=joint_nlu),
reason="; ".join(reasons[:4]) if reasons else "heuristic clause match",
candidates=candidates,
)
for intent, score, reasons, _signal_meta in parallel_candidates
]
intent, score, reasons, _signal_meta = scored[0]
return [
ClauseParseResult(
clause_text=cleaned_clause,
selected_intent_id=intent.intent_id,
score=score,
slots=_extract_slots_for_intent(cleaned_clause, intent.intent_id, joint_nlu=joint_nlu),
reason="; ".join(reasons[:4]) if reasons else "heuristic clause match",
candidates=candidates,
)
]
def _score_intent_for_clause(
clause: str,
intent: IntentDefinition,
full_text: str | None = None,
) -> tuple[float, list[str], dict[str, Any]]:
score = 0.0
reasons: list[str] = []
signal_meta = _collect_intent_signal_meta(clause, intent, full_text=full_text)
for keyword in intent.keywords:
if keyword and keyword in clause:
score += 2.0
reasons.append(f"keyword:{keyword}")
for example in intent.examples:
if example and example in clause:
score += 1.2
reasons.append(f"example:{example}")
action_hits = signal_meta["action_hits"]
object_hits = signal_meta["object_hits"]
qualifier_hits = signal_meta["qualifier_hits"]
if action_hits:
score += 0.8
reasons.append(f"action:{action_hits[0]}")
if object_hits:
score += 0.8
reasons.append(f"object:{object_hits[0]}")
if action_hits and object_hits:
score += 0.9
reasons.append("action_object_pair")
if qualifier_hits:
score += 0.5
reasons.append(f"qualifier:{qualifier_hits[0]}")
shared_context_hits = signal_meta["shared_context_hits"]
if shared_context_hits and (action_hits or object_hits):
score += 0.45
reasons.append(f"context:{shared_context_hits[0]}")
if intent.intent_id == "cabin_set_ac" and re.search(r"\d{2}\s*度", clause):
score += 1.1
reasons.append("explicit_temperature")
signal_meta["explicit_slot"] = True
if intent.intent_id in {"cs_query_order", "cs_query_logistics", "cs_cancel_order"} and re.search(r"\b[A-Za-z]\d{5,}\b", clause):
score += 0.7
reasons.append("explicit_order_id")
signal_meta["explicit_slot"] = True
return score, reasons, signal_meta
def _collect_intent_signal_meta(
clause: str,
intent: IntentDefinition,
full_text: str | None = None,
) -> dict[str, Any]:
hint_config = _INTENT_HINTS.get(intent.intent_id, {})
action_hits = [token for token in hint_config.get("actions", ()) if token in clause]
object_hits = [token for token in hint_config.get("objects", ()) if token in clause]
qualifier_hits = [token for token in hint_config.get("qualifiers", ()) if token in clause]
shared_context_hits: list[str] = []
if full_text and full_text != clause:
shared_context_hits = [
token
for token in hint_config.get("shared_context", ())
if token in full_text and token not in clause
]
return {
"action_hits": action_hits,
"object_hits": object_hits,
"qualifier_hits": qualifier_hits,
"shared_context_hits": shared_context_hits,
"explicit_slot": False,
"family": str(hint_config.get("family") or intent.intent_id),
}
def _predict_clause_with_classifier(
clause: str,
intents: list[IntentDefinition],
clause_classifier: IntentClassifier | None,
) -> ClassificationResult | None:
if clause_classifier is None:
return None
try:
return clause_classifier.predict(clause, intents)
except Exception:
return None
def _fuse_clause_scores(
clause: str,
intents: list[IntentDefinition],
heuristic_scored: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]],
classifier_result: ClassificationResult | None,
detector_prior: dict[str, float],
classifier_weight: float,
model_only_threshold: float,
) -> list[tuple[IntentDefinition, float, list[str], dict[str, Any]]]:
intent_map = {intent.intent_id: intent for intent in intents}
classifier_candidates = _extract_classifier_candidates(classifier_result, intent_map)
merged: dict[str, dict[str, Any]] = {}
for intent, score, reasons, signal_meta in heuristic_scored:
merged[intent.intent_id] = {
"intent": intent,
"heuristic_score": score,
"model_score": 0.0,
"reasons": list(reasons),
"signal_meta": dict(signal_meta),
}
for item in classifier_candidates:
intent = item["intent"]
if intent is None:
continue
bucket = merged.setdefault(
intent.intent_id,
{
"intent": intent,
"heuristic_score": 0.0,
"model_score": 0.0,
"reasons": [],
"signal_meta": _collect_intent_signal_meta(clause, intent),
},
)
bucket["model_score"] = max(bucket["model_score"], item["score"])
bucket["reasons"].append(f"bert:{item['score']:.3f}")
fused: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]] = []
for item in merged.values():
heuristic_score = float(item["heuristic_score"])
model_score = float(item["model_score"])
detector_score = float(detector_prior.get(item["intent"].intent_id, 0.0))
if heuristic_score <= 0 and not _allow_model_bootstrap(
clause,
model_score,
classifier_candidates,
model_only_threshold,
):
continue
fused_score = heuristic_score + model_score * classifier_weight
if detector_score > 0:
fused_score += detector_score * 0.55
item["reasons"].append(f"multi:{detector_score:.3f}")
if heuristic_score > 0 and model_score > 0:
fused_score += 0.25
item["reasons"].append("heuristic_bert_agree")
signal_meta = dict(item["signal_meta"])
signal_meta["heuristic_score"] = heuristic_score
signal_meta["model_score"] = model_score
signal_meta["detector_score"] = detector_score
if heuristic_score <= 0 and model_score >= model_only_threshold:
item["reasons"].append("bert_bootstrap")
fused.append((item["intent"], fused_score, item["reasons"], signal_meta))
fused.sort(key=lambda row: row[1], reverse=True)
return fused
def _build_detector_prior(detector_result: MultiIntentDetectionResult | None) -> dict[str, float]:
if detector_result is None:
return {}
prior: dict[str, float] = {}
for candidate in detector_result.candidates:
if candidate.score <= 0:
continue
prior[candidate.intent_id] = candidate.score
return prior
def _extract_classifier_candidates(
classifier_result: ClassificationResult | None,
intent_map: dict[str, IntentDefinition],
) -> list[dict[str, Any]]:
if classifier_result is None:
return []
normalized: list[dict[str, Any]] = []
for item in classifier_result.raw_candidates or []:
if not isinstance(item, dict):
continue
intent_id = str(item.get("intent_id") or "")
intent = intent_map.get(intent_id)
if intent is None:
continue
normalized.append(
{
"intent": intent,
"intent_id": intent_id,
"score": float(item.get("score", 0.0)),
"label": str(item.get("label") or intent_id),
}
)
if normalized:
return normalized
if classifier_result.candidates:
return [
{
"intent": intent,
"intent_id": intent.intent_id,
"score": float(score),
"label": intent.intent_id,
}
for intent, score in classifier_result.candidates
]
return []
def _allow_model_bootstrap(
clause: str,
model_score: float,
classifier_candidates: list[dict[str, Any]],
model_only_threshold: float,
) -> bool:
if model_score < model_only_threshold:
return False
compact_clause = re.sub(r"\s+", "", clause)
if len(compact_clause) < 4 and not re.search(r"\d{2}\s*度|\b[A-Za-z]\d{5,}\b", clause):
return False
runner_up_score = 0.0
if len(classifier_candidates) >= 2:
runner_up_score = float(classifier_candidates[1].get("score", 0.0))
return model_score - runner_up_score >= 0.18
def _collect_parallel_clause_candidates(
clause: str,
scored: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]],
) -> list[tuple[IntentDefinition, float, list[str], dict[str, Any]]]:
if not _looks_like_parallel_clause(clause):
return []
selected: list[tuple[IntentDefinition, float, list[str], dict[str, Any]]] = []
used_families: set[str] = set()
strong_candidates = [item for item in scored if item[1] >= 1.35]
for intent, score, reasons, signal_meta in strong_candidates:
family = str(signal_meta.get("family") or intent.intent_id)
if family in used_families:
continue
has_strong_anchor = bool(signal_meta["object_hits"] or signal_meta["explicit_slot"] or any(reason.startswith("keyword:") for reason in reasons))
if not has_strong_anchor:
continue
selected.append((intent, score, reasons, signal_meta))
used_families.add(family)
if len(selected) >= 4:
break
if len(selected) < 2:
return []
selected.sort(key=lambda item: (_signal_position(clause, item[3]), -item[1]))
return selected
def _signal_position(clause: str, signal_meta: dict[str, Any]) -> int:
candidates = []
for token in list(signal_meta.get("object_hits") or []) + list(signal_meta.get("action_hits") or []):
if not token:
continue
position = clause.find(token)
if position >= 0:
candidates.append(position)
if not candidates:
return len(clause) + 1
return min(candidates)
def _split_clauses(text: str) -> list[str]:
clauses = re_split(text)
return [item for item in clauses if item]
def _has_complex_pattern(text: str) -> bool:
patterns = ("然后", "", "并且", "", "同时", "如果", "", "", "", "顺便", "接着", "以及", "")
return any(token in text for token in patterns)
def _has_conditional_pattern(text: str) -> bool:
return any(token in text for token in ("如果", "", "还没", "未发货", "没发货"))
def _apply_conditional_hints(text: str, steps: list[PlannedStep]) -> list[PlannedStep]:
if len(steps) < 2:
return steps
has_unshipped_condition = any(token in text for token in ("还没发货", "未发货", "没发货"))
query_index = next((index for index, step in enumerate(steps) if step.intent_id == "cs_query_order"), None)
cancel_index = next((index for index, step in enumerate(steps) if step.intent_id == "cs_cancel_order"), None)
if has_unshipped_condition and query_index is not None and cancel_index is not None and cancel_index > query_index:
cancel_step = steps[cancel_index]
if not cancel_step.depends_on:
cancel_step.depends_on = [query_index + 1]
if not cancel_step.condition:
cancel_step.condition = {
"source_step": query_index + 1,
"field": "order_status",
"operator": "equals",
"value": "pending_shipment",
"description": "仅在订单未发货时取消",
}
cancel_step.requires_confirmation = True
return steps
def re_split(text: str) -> list[str]:
chunks = re.split(r"(?:然后|并且|同时|再|顺便|接着|并|,|||;)", text)
return [chunk.strip() for chunk in chunks if chunk.strip()]
def _looks_like_parallel_clause(text: str) -> bool:
return any(token in text for token in ("", "以及", "", "同时", "还有"))
def _serialize_clause_analysis(parse_result: MultiIntentParseResult) -> list[dict[str, Any]]:
return [
{
"clause_text": clause.clause_text,
"selected_intent_id": clause.selected_intent_id,
"score": round(clause.score, 4),
"slots": clause.slots,
"reason": clause.reason,
"candidates": clause.candidates,
}
for clause in parse_result.clauses
]
def _serialize_multi_intent_detector(
detector_result: MultiIntentDetectionResult | None,
) -> dict[str, Any] | None:
if detector_result is None:
return None
return {
"detected": detector_result.detected,
"reason": detector_result.reason,
"backend_name": detector_result.backend_name,
"error_message": detector_result.error_message,
"candidates": [
{
"intent_id": candidate.intent_id,
"score": round(candidate.score, 4),
"label": candidate.label,
}
for candidate in detector_result.candidates
],
"raw_scores": [
{
"intent_id": str(item.get("intent_id") or ""),
"label": str(item.get("label") or ""),
"score": round(float(item.get("score", 0.0)), 4),
}
for item in detector_result.raw_scores[:8]
],
}
_INTENT_HINTS: dict[str, dict[str, Any]] = {
"cabin_ac_on": {
"family": "ac_power",
"actions": ("打开", "开启", "启动", ""),
"objects": ("空调", "ac", "冷气", "冷风"),
},
"cabin_ac_off": {
"family": "ac_power",
"actions": ("关闭", "关掉", "", "停掉"),
"objects": ("空调", "ac", "冷气", "冷风"),
},
"cabin_set_ac": {
"family": "ac_temperature",
"actions": ("调到", "设到", "设成", "调低", "调高", "升高", "降低"),
"objects": ("空调", "温度", "", "冷气"),
"qualifiers": ("冷一点", "热一点"),
},
"cabin_fan_up": {
"family": "fan_speed",
"actions": ("调大", "调高", "加大"),
"objects": ("风量", ""),
},
"cabin_fan_down": {
"family": "fan_speed",
"actions": ("调小", "调低", "减小"),
"objects": ("风量", ""),
},
"cabin_defog_front_on": {
"family": "defog_front",
"actions": ("打开", "开启", "", "", "除雾"),
"objects": ("前挡", "前挡风", "前窗", "前玻璃"),
},
"cabin_defog_rear_on": {
"family": "defog_rear",
"actions": ("打开", "开启", "", "", "除雾"),
"objects": ("后挡", "后挡风", "后窗", "后玻璃"),
},
"cabin_window_open": {
"family": "window",
"actions": ("打开", "", "升起来"),
"objects": ("车窗", "窗户", ""),
},
"cabin_window_close": {
"family": "window",
"actions": ("关闭", "关上", "关掉", "降下来", "降下", ""),
"objects": ("车窗", "窗户", ""),
},
"cabin_sunroof_open": {
"family": "sunroof",
"actions": ("打开", "", "翘起", "翘起来"),
"objects": ("天窗",),
},
"cabin_sunroof_close": {
"family": "sunroof",
"actions": ("关闭", "关上", "合上", "关掉", "合起来"),
"objects": ("天窗",),
},
"cabin_lock_doors": {
"family": "door_lock",
"actions": ("锁上", "锁住", "上锁", ""),
"objects": ("车门", "", "车锁"),
},
"cabin_unlock_doors": {
"family": "door_lock",
"actions": ("解锁", "开锁"),
"objects": ("车门", "", "车锁"),
},
"cabin_nav_to": {
"family": "nav",
"actions": ("导航", "", "", "带我去"),
"objects": ("公司", "机场", "", "目的地"),
},
"cabin_nav_cancel": {
"family": "nav",
"actions": ("取消", "结束", "停止", "退出", "关掉"),
"objects": ("导航", "路线"),
},
"cabin_play_music": {
"family": "music_playback",
"actions": ("播放", "来点", "放点", "", "来首", "来一首"),
"objects": ("音乐", "", "轻音乐", "摇滚", "流行"),
},
"cabin_pause_music": {
"family": "music_playback",
"actions": ("暂停", "停掉", "停止"),
"objects": ("音乐", "", "播放"),
},
"cabin_next_track": {
"family": "track_switch",
"actions": ("下一首", "切到下一首", "切歌", "换一首"),
"objects": ("", "歌曲", "音乐"),
},
"cabin_previous_track": {
"family": "track_switch",
"actions": ("上一首", "切回上一首", "回到上一首"),
"objects": ("", "歌曲", "音乐"),
},
"cabin_volume_up": {
"family": "volume",
"actions": ("调大", "加大", "开大", "调高"),
"objects": ("音量", "声音", "媒体音量"),
},
"cabin_volume_down": {
"family": "volume",
"actions": ("调小", "压低", "关小", "调低"),
"objects": ("音量", "声音", "媒体音量"),
},
"cabin_volume_mute": {
"family": "volume",
"actions": ("静音", "关掉", "关闭"),
"objects": ("音量", "声音", "音响"),
},
"cabin_lights_on": {
"family": "lights",
"actions": ("打开", "", "点亮"),
"objects": ("", "车灯", "大灯"),
},
"cabin_lights_off": {
"family": "lights",
"actions": ("关闭", "关掉", "", "熄了"),
"objects": ("", "车灯", "大灯"),
},
"cabin_seat_heat_on": {
"family": "seat_heat",
"actions": ("打开", "开启", ""),
"objects": ("座椅加热",),
},
"cabin_seat_heat_off": {
"family": "seat_heat",
"actions": ("关闭", "关掉", ""),
"objects": ("座椅加热",),
},
"cabin_mirror_fold": {
"family": "mirror",
"actions": ("折叠", "收起"),
"objects": ("后视镜",),
},
"cabin_mirror_unfold": {
"family": "mirror",
"actions": ("展开", "打开"),
"objects": ("后视镜",),
},
"cabin_wiper_on": {
"family": "wiper",
"actions": ("打开", "启动", "开始"),
"objects": ("雨刷", "雨刮", "雨刮器"),
},
"cabin_wiper_off": {
"family": "wiper",
"actions": ("关闭", "停掉", "停止"),
"objects": ("雨刷", "雨刮", "雨刮器"),
},
"cs_query_order": {
"family": "order_query",
"actions": ("", "查询", "看看"),
"objects": ("订单",),
"shared_context": ("订单",),
},
"cs_query_logistics": {
"family": "order_logistics",
"actions": ("", "查询", "看看"),
"objects": ("物流", "快递"),
"shared_context": ("物流", "快递", "订单"),
},
"cs_cancel_order": {
"family": "order_cancel",
"actions": ("取消", "撤销", "撤单", "不要了"),
"objects": ("订单",),
"shared_context": ("订单",),
},
}