Files
2026-06-11 16:28:00 +08:00

39 lines
1.2 KiB
Python

from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
class WorkflowStep(BaseModel):
step: int
step_id: str
intent_id: str
plugin_id: str
action: str
status: Literal["pending", "running", "completed", "failed", "skipped", "waiting_confirmation"] = "pending"
depends_on: list[str] = Field(default_factory=list)
slots: dict[str, Any] = Field(default_factory=dict)
condition: dict[str, Any] = Field(default_factory=dict)
requires_confirmation: bool = False
timeout_ms: int = 1500
class MissingSlot(BaseModel):
slot_name: str
ask_template: str
priority: int = 1
class Workflow(BaseModel):
workflow_id: str
workflow_type: Literal["single", "sequence", "conditional", "parallel"] = "single"
domain: str
intent_id: str
status: Literal["ready", "waiting_slot", "waiting_confirmation", "running", "completed", "failed"] = "ready"
risk_level: Literal["low", "medium", "high"] = "low"
slots: dict[str, Any] = Field(default_factory=dict)
missing_slots: list[MissingSlot] = Field(default_factory=list)
steps: list[WorkflowStep] = Field(default_factory=list)
meta: dict[str, Any] = Field(default_factory=dict)