Files
ai-device/intelligent_cabin/archive/tests/test_config_loader.py
2026-06-11 16:28:00 +08:00

91 lines
4.1 KiB
Python

from __future__ import annotations
import unittest
from unittest.mock import patch
from app.core.bootstrap import build_planner, load_runtime_bundle
from app.core.config import settings
from app.services.planner import CompositeWorkflowPlanner
from app.services.config_loader import ConfigLoader
from app.services.dialog_rules import DialogRuleEngine
from app.services.response_policy import ResponsePolicy
class ConfigLoaderTests(unittest.TestCase):
def test_loader_reads_domain_actions_and_responses(self) -> None:
bundle = ConfigLoader(
domain_path="config/domain.yml",
action_path="config/actions.yml",
response_path="config/responses.yml",
form_path="config/forms.yml",
rule_path="config/rules.yml",
dialog_act_path="config/dialog_acts.yml",
workflow_path="config/workflows.yml",
legacy_intent_path="app/data/intents.json",
).load()
self.assertGreaterEqual(len(bundle.intent_registry.list()), 30)
self.assertEqual(bundle.intent_registry.get("cabin_window_open").plugin_id, "plugin.cabin.window.open")
self.assertEqual(bundle.intent_hints.get("cabin_window_open"), "打开车窗")
self.assertEqual(bundle.response_templates.get("task_stopped"), "好的,已停止当前任务。")
self.assertEqual(bundle.intent_registry.get("cabin_set_ac").required_slots, ["temperature"])
self.assertTrue(bundle.dialog_rules.is_stop_request("先不要了"))
self.assertEqual(bundle.dialog_rules.parse_confirmation_decision("确认"), True)
self.assertEqual(bundle.dialog_act_engine.detect("你好"), "chitchat")
self.assertGreaterEqual(len(bundle.workflow_templates.templates), 2)
def test_bootstrap_runtime_bundle_is_available(self) -> None:
bundle = load_runtime_bundle()
self.assertGreaterEqual(len(bundle.intent_registry.list()), 30)
self.assertIn("fallback", bundle.response_templates)
self.assertEqual(bundle.dialog_act_engine.detect("确认"), "affirm")
def test_response_policy_can_be_driven_by_config_templates(self) -> None:
policy = ResponsePolicy(
templates={"reject": "这个能力暂未开通。"},
intent_hints={"cabin_window_open": "开车窗"},
)
self.assertEqual(policy.reject(), "这个能力暂未开通。")
self.assertEqual(policy.clarify(["cabin_window_open"]), "请确认一下,你是想开车窗吗?")
def test_response_policy_formats_multi_step_summary_naturally(self) -> None:
policy = ResponsePolicy()
summary = policy.workflow_summary(["好的,已打开空调。", "已将空调调到 20 度。"])
self.assertEqual(summary, "好,空调已经打开了,也调到 20 度了。")
def test_response_policy_formats_multi_step_summary_in_vehicle_style(self) -> None:
policy = ResponsePolicy()
summary = policy.workflow_summary(["好的,已打开空调。", "好的,已关闭车窗。"])
self.assertEqual(summary, "好,空调已经打开了,车窗也帮你关上了。")
def test_build_planner_prefers_local_planners_before_cloud(self) -> None:
with patch.object(settings, "planner_backend", "dashscope"):
planner = build_planner()
self.assertIsInstance(planner, CompositeWorkflowPlanner)
self.assertIsInstance(planner._planners[0], CompositeWorkflowPlanner)
def test_dialog_rule_engine_supports_configured_confirmation_and_stop(self) -> None:
rules = DialogRuleEngine(
stop_phrases=("先不用了",),
positive_confirmation_tokens=("好,继续",),
negative_confirmation_tokens=("取消吧",),
confirmation_required_intents=("foo",),
confirmation_required_risk_levels=("high",),
)
self.assertTrue(rules.is_stop_request("先不用了"))
self.assertTrue(rules.parse_confirmation_decision("好,继续"))
self.assertFalse(rules.parse_confirmation_decision("取消吧"))
self.assertTrue(rules.requires_confirmation("foo", "low"))
if __name__ == "__main__":
unittest.main()