feat(api-converter): 添加进销存采购订单API配置并实现本地缓存机制

新增api_config_9p04kww1pu.json配置文件,包含进销存采购订单相关的四个核心
API接口(查询列表、新建、详情、编辑),完善了load_api_configs函数,
增加本地文件缓存机制,支持从本地文件加载配置并在配置变更时同步保存,
优化refresh_api_configs函数以同步清理本地文件缓存。

BREAKING CHANGE: API配置方式调整,引入本地缓存机制可能影响原有部署流程
This commit is contained in:
2026-02-07 15:48:01 +08:00
parent 41c3d7a8fd
commit e18c661368
35 changed files with 2375 additions and 362 deletions

View File

@@ -19,12 +19,12 @@ from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
try:
from .schema_converter import convert_sql_params_to_input_schema
from .schema_converter import convert_sql_params_to_input_schema, sanitize_json_schema
from .utils.api_client import execute_workflow, get_workflow_by_id, extract_final_output
from .utils.env_config import get_workflow_id
from .utils.logger_config import setup_system_logging, get_logger
except ImportError:
from schema_converter import convert_sql_params_to_input_schema
from schema_converter import convert_sql_params_to_input_schema, sanitize_json_schema
from utils.api_client import execute_workflow, get_workflow_by_id, extract_final_output
from utils.env_config import get_workflow_id
from utils.logger_config import setup_system_logging, get_logger
@@ -174,13 +174,30 @@ async def handle_list_tools() -> list[types.Tool]:
for query in _tools_config:
name = query.get("name", "")
description = query.get("description") or query.get("toolPrompt") or query.get("uniqueName", "")
sql_params = query.get("sqlParams", "[]")
logger.debug(f"处理工具配置: name={name}, description={description[:50] if description else 'None'}...")
# 转换参数为 inputSchema
input_schema = convert_sql_params_to_input_schema(sql_params)
logger.debug(f"工具 {name} 的 inputSchema: {json.dumps(input_schema, ensure_ascii=False)}")
# 优先使用 inputJsonSchema,如果不存在则从 sqlParams 转换
input_json_schema = query.get("inputJsonSchema")
if input_json_schema:
try:
# inputJsonSchema 是 JSON 字符串,需要解析
if isinstance(input_json_schema, str):
input_schema = json.loads(input_json_schema)
else:
input_schema = input_json_schema
# 清理 schema确保 enum 只包含字符串
input_schema = sanitize_json_schema(input_schema)
logger.debug(f"使用 inputJsonSchema (已清理): {json.dumps(input_schema, ensure_ascii=False)}")
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"解析 inputJsonSchema 失败: {e},回退到 sqlParams")
sql_params = query.get("sqlParams", "[]")
input_schema = convert_sql_params_to_input_schema(sql_params)
else:
# 从 sqlParams 转换
sql_params = query.get("sqlParams", "[]")
input_schema = convert_sql_params_to_input_schema(sql_params)
logger.debug(f"从 sqlParams 转换的 inputSchema: {json.dumps(input_schema, ensure_ascii=False)}")
tools.append(
types.Tool(

View File

@@ -12,6 +12,41 @@ import json
from typing import Any
def sanitize_json_schema(schema: dict) -> dict:
"""
清理 JSON Schema确保 enum 字段只包含字符串值
Args:
schema: 原始 JSON Schema
Returns:
dict: 清理后的 JSON Schema
"""
if not isinstance(schema, dict):
return schema
# 递归处理 properties
if "properties" in schema and isinstance(schema["properties"], dict):
for prop_name, prop_schema in schema["properties"].items():
if isinstance(prop_schema, dict) and "enum" in prop_schema:
enum_values = prop_schema["enum"]
if isinstance(enum_values, list) and len(enum_values) > 0:
# 检查是否是对象数组
if isinstance(enum_values[0], dict):
# 提取 value 字段
cleaned_enum = [
item.get("value", item.get("label", ""))
for item in enum_values
if isinstance(item, dict)
]
# 过滤空值
prop_schema["enum"] = [v for v in cleaned_enum if v]
# 保留原始数据
prop_schema["x-options"] = enum_values
return schema
def convert_param_to_schema_property(param: dict) -> tuple[str, dict, bool]:
"""
将单个参数转换为 JSON Schema property
@@ -57,7 +92,22 @@ def convert_param_to_schema_property(param: dict) -> tuple[str, dict, bool]:
elif param_type == "select":
property_schema["type"] = "string"
if options:
property_schema["enum"] = options
# 处理 options 可能是对象数组 [{label, value}] 或字符串数组的情况
if isinstance(options, list) and len(options) > 0:
if isinstance(options[0], dict):
# 对象数组,提取 value 字段作为 enum过滤空值
enum_values = [
opt.get("value", opt.get("label", ""))
for opt in options
if isinstance(opt, dict)
]
# 过滤掉空字符串
property_schema["enum"] = [v for v in enum_values if v]
# 保留原始 options 供前端使用(如果前端支持)
property_schema["x-options"] = options
else:
# 字符串数组,直接使用,过滤空值
property_schema["enum"] = [v for v in options if v]
elif param_type == "number":
property_schema["type"] = "number"

View File

@@ -7,8 +7,8 @@ import os
if __name__ == "__main__":
# 设置环境变量
os.environ["workflowId"] = "2005892514011795457"
os.environ["workflowExecuteKey"] = "wf_ce270212b2ee45ab9c81714a7c243c56"
os.environ["workflowId"] = "2019984063311556609"
os.environ["workflowExecuteKey"] = "wf_45bd630402664485a2ef5cd3ede5aa53"
os.environ["backendBaseUrl"] = "http://192.168.2.236:8088"
# Import and run the actual MCP server