diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/.python-version b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/.python-version new file mode 100644 index 0000000..24ee5b1 --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/.python-version @@ -0,0 +1 @@ +3.13 diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/README.md b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/README.md new file mode 100644 index 0000000..e69de29 diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/__pycache__/main.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..1bb0314 Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/__pycache__/main.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/__pycache__/schema_converter.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/__pycache__/schema_converter.cpython-312.pyc new file mode 100644 index 0000000..8449988 Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/__pycache__/schema_converter.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/businessQueries.json b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/businessQueries.json new file mode 100644 index 0000000..9e90f97 --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/businessQueries.json @@ -0,0 +1,80 @@ +[ + { + "id": "2000851983567065089", + "createBy": "bjt", + "createTime": "2025-12-16 16:54:22", + "updateBy": null, + "updateTime": null, + "serviceId": "2000821852159709186", + "uniqueName": "按摘要查凭证", + "name": "anzhaiyaochapingzheng_91da19f8", + "description": "按照摘要描述查询凭证信息", + "visualizable": 1, + "toolPrompt": "按照摘要描述查询凭证信息", + "toolType": "sql", + "datasourceId": "158", + "sqlTemplate": "WITH query_params AS (\n SELECT \n {year}::INT AS target_year,\n '{digest}' AS digest_keyword,\n '{company_names}' AS company_names_param,\n '{months}' AS months_param\n),\ncompany_filters AS (\n SELECT \n TRIM(keyword) AS company_keyword\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT company_names_param FROM query_params LIMIT 1), ''), ',')) AS keyword\n ) t\n WHERE keyword IS NOT NULL AND TRIM(keyword) <> ''\n),\nmonth_filters AS (\n SELECT \n month::INT AS target_month\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT months_param FROM query_params LIMIT 1), ''), ',')) AS month\n ) t\n WHERE month IS NOT NULL AND TRIM(month) <> ''\n)\n\nSELECT \n c.company_name AS 公司,\n v.dbill_date AS 日期,\n v.csign || LPAD(v.ino_id::text, 4, '0') AS 凭证号,\n v.cdigest AS 摘要,\n a.ccode AS 科目编码,\n a.ccode_name AS 科目名称,\n v.md AS 借方,\n v.mc AS 贷方,\n v.cbill AS 制单人\nFROM uf_voucher v\nJOIN uf_company c ON v.company_id = c.id\nJOIN uf_account_code a ON v.account_code_id = a.id\nCROSS JOIN query_params qp\nWHERE v.bdelete = false\n AND v.iyear = qp.target_year\n AND v.cdigest LIKE '%' || qp.digest_keyword || '%'\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n AND (\n NOT EXISTS (SELECT 1 FROM month_filters)\n OR v.iperiod IN (SELECT target_month FROM month_filters)\n )\nORDER BY c.company_name, v.dbill_date DESC, v.ino_id;", + "sqlParams": "[{\"type\":\"paragraph\",\"name\":\"digest\",\"displayName\":\"摘要关键字\",\"maxLength\":500,\"defaultValue\":\"\",\"required\":true},{\"type\":\"string\",\"name\":\"company_names\",\"displayName\":\"公司名称\",\"maxLength\":200,\"defaultValue\":\"\",\"required\":false},{\"type\":\"number\",\"name\":\"year\",\"displayName\":\"会计年度\",\"maxLength\":4,\"defaultValue\":\"2025\",\"required\":true},{\"type\":\"string\",\"name\":\"months\",\"displayName\":\"月份\",\"maxLength\":50,\"defaultValue\":\"\",\"required\":false}]", + "resultType": "list", + "sourceType": "ai", + "trainingTaskId": null, + "tableMetadataIds": "", + "executionCount": 0, + "visualizationConfigs": [], + "inputJsonSchema": "{}", + "outputJsonSchema": "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"array\"}}}", + "lastExecutionTime": null + }, + { + "id": "2000852304343240706", + "createBy": "bjt", + "createTime": "2025-12-16 16:55:38", + "updateBy": null, + "updateTime": null, + "serviceId": "2000821852159709186", + "uniqueName": "按科目查凭证", + "name": "ankemuchapingzheng_6a268ea1", + "description": "按照科目名称查询凭证信息", + "visualizable": 1, + "toolPrompt": "按照科目名称查询凭证信息", + "toolType": "sql", + "datasourceId": "158", + "sqlTemplate": "WITH query_params AS (\n SELECT \n {year}::INT AS target_year,\n '{company_names}' AS company_names_param,\n '{subject_name}' AS subject_keyword,\n '{months}' AS months_param\n),\ncompany_filters AS (\n SELECT \n TRIM(keyword) AS company_keyword\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT company_names_param FROM query_params LIMIT 1), ''), ',')) AS keyword\n ) t\n WHERE keyword IS NOT NULL AND TRIM(keyword) <> ''\n),\nmonth_filters AS (\n SELECT \n month::INT AS target_month\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT months_param FROM query_params LIMIT 1), ''), ',')) AS month\n ) t\n WHERE month IS NOT NULL AND TRIM(month) <> ''\n),\n目标科目 AS (\n SELECT DISTINCT a.id, a.ccode\n FROM uf_account_code a\n JOIN uf_company c ON a.company_id = c.id\n CROSS JOIN query_params qp\n WHERE a.ccode_name LIKE '%' || qp.subject_keyword || '%'\n AND a.iyear = qp.target_year\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n),\n相关科目 AS (\n SELECT a.id\n FROM uf_account_code a\n JOIN uf_company c ON a.company_id = c.id\n CROSS JOIN query_params qp\n WHERE a.iyear = qp.target_year\n AND EXISTS (\n SELECT 1 FROM 目标科目 t \n WHERE a.ccode LIKE t.ccode || '%'\n )\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n)\nSELECT \n c.company_name AS 公司,\n v.dbill_date AS 日期,\n v.csign || LPAD(v.ino_id::text, 4, '0') AS 凭证号,\n v.cdigest AS 摘要,\n a.ccode AS 科目编码,\n a.ccode_name AS 科目名称,\n a.igrade AS 级次,\n v.md AS 借方,\n v.mc AS 贷方\nFROM uf_voucher v\nJOIN uf_company c ON v.company_id = c.id\nJOIN uf_account_code a ON v.account_code_id = a.id\nCROSS JOIN query_params qp\nWHERE v.bdelete = false\n AND v.account_code_id IN (SELECT id FROM 相关科目)\n AND v.iyear = qp.target_year\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n AND (\n NOT EXISTS (SELECT 1 FROM month_filters)\n OR v.iperiod IN (SELECT target_month FROM month_filters)\n )\nORDER BY c.company_name, a.ccode, v.dbill_date, v.ino_id;", + "sqlParams": "[{\"type\":\"select\",\"name\":\"subject_name\",\"displayName\":\"科目名称\",\"maxLength\":100,\"defaultValue\":\"\",\"required\":true,\"options\":[\"管理费用\",\"销售费用\",\"财务费用\",\"应收账款\",\"应付账款\"]},{\"type\":\"paragraph\",\"name\":\"company_names\",\"displayName\":\"公司名称列表\",\"maxLength\":300,\"defaultValue\":\"\",\"required\":false},{\"type\":\"number\",\"name\":\"year\",\"displayName\":\"年度\",\"maxLength\":4,\"defaultValue\":\"2025\",\"required\":true},{\"type\":\"select\",\"name\":\"months\",\"displayName\":\"期间月份\",\"maxLength\":30,\"defaultValue\":\"\",\"required\":false,\"options\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\",\"10\",\"11\",\"12\"]}]", + "resultType": "list", + "sourceType": "ai", + "trainingTaskId": null, + "tableMetadataIds": "", + "executionCount": 0, + "visualizationConfigs": [], + "inputJsonSchema": "{}", + "outputJsonSchema": "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"array\"}}}", + "lastExecutionTime": null + }, + { + "id": "2000852610904920066", + "createBy": "bjt", + "createTime": "2025-12-16 16:56:51", + "updateBy": null, + "updateTime": null, + "serviceId": "2000821852159709186", + "uniqueName": "按科目查余额", + "name": "ankemuchayue_68b302c2", + "description": "按照科目名称查询余额", + "visualizable": 1, + "toolPrompt": "按照科目名称查询余额", + "toolType": "sql", + "datasourceId": "158", + "sqlTemplate": "WITH query_params AS (\n SELECT \n {year}::INT AS target_year,\n '{company_names}' AS company_names_param,\n '{subject_name}' AS subject_keyword,\n '{months}' AS months_param\n),\ncompany_filters AS (\n SELECT \n TRIM(keyword) AS company_keyword\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT company_names_param FROM query_params LIMIT 1), ''), ',')) AS keyword\n ) t\n WHERE keyword IS NOT NULL AND TRIM(keyword) <> ''\n),\nmonth_filters AS (\n SELECT \n month::INT AS target_month\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT months_param FROM query_params LIMIT 1), ''), ',')) AS month\n ) t\n WHERE month IS NOT NULL AND TRIM(month) <> ''\n),\n目标科目 AS (\n SELECT DISTINCT a.id, a.ccode\n FROM uf_account_code a\n JOIN uf_company c ON a.company_id = c.id\n CROSS JOIN query_params qp\n WHERE a.ccode_name LIKE '%' || qp.subject_keyword || '%'\n AND a.iyear = qp.target_year\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n),\n相关科目 AS (\n SELECT a.id\n FROM uf_account_code a\n JOIN uf_company c ON a.company_id = c.id\n CROSS JOIN query_params qp\n WHERE a.iyear = qp.target_year\n AND EXISTS (\n SELECT 1 FROM 目标科目 t \n WHERE a.ccode LIKE t.ccode || '%'\n )\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n)\nSELECT \n c.company_name AS 公司,\n a.ccode AS 科目编码,\n a.ccode_name AS 科目名称,\n a.igrade AS 级次,\n a.cclass AS 类别,\n b.iyear AS 年,\n b.iperiod AS 月,\n b.cbegind_c AS 期初方向,\n b.mb AS 期初余额,\n b.md AS 借方发生,\n b.mc AS 贷方发生,\n b.cendd_c AS 期末方向,\n b.me AS 期末余额\nFROM uf_account_balance b\nJOIN uf_company c ON b.company_id = c.id\nJOIN uf_account_code a ON b.account_code_id = a.id\nCROSS JOIN query_params qp\nWHERE b.iyear = qp.target_year\n AND b.account_code_id IN (SELECT id FROM 相关科目)\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n AND (\n NOT EXISTS (SELECT 1 FROM month_filters)\n OR b.iperiod IN (SELECT target_month FROM month_filters)\n )\nORDER BY c.company_name, a.ccode, b.iperiod;", + "sqlParams": "[{\"type\":\"string\",\"name\":\"subject_name\",\"displayName\":\"科目名称关键字\",\"maxLength\":100,\"defaultValue\":\"银行存款\",\"required\":true},{\"type\":\"string\",\"name\":\"company_names\",\"displayName\":\"公司筛选\",\"maxLength\":150,\"defaultValue\":\"\",\"required\":false},{\"type\":\"number\",\"name\":\"year\",\"displayName\":\"查询年度\",\"maxLength\":4,\"defaultValue\":\"2025\",\"required\":true},{\"type\":\"number\",\"name\":\"months\",\"displayName\":\"查询月份\",\"maxLength\":2,\"defaultValue\":\"\",\"required\":false}]", + "resultType": "list", + "sourceType": "ai", + "trainingTaskId": null, + "tableMetadataIds": "", + "executionCount": 0, + "visualizationConfigs": [], + "inputJsonSchema": "{}", + "outputJsonSchema": "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"array\"}}}", + "lastExecutionTime": null + } +] \ No newline at end of file diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/logs/.gitkeep b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/logs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/main.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/main.py new file mode 100644 index 0000000..dd4a0f6 --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/main.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +""" +Workflow to MCP Server +从 businessQueries.json 或 API 动态生成 MCP 工具 + +支持两种模式: +- local: 从本地 JSON 文件加载 +- api: 从远程 API 加载 +""" +import json +import os +import logging +import argparse +import anyio + +import mcp.types as types +from mcp.server import NotificationOptions, Server +from mcp.server.models import InitializationOptions +from mcp.server.stdio import stdio_server + +try: + from .schema_converter import convert_sql_params_to_input_schema + from .utils.api_client import execute_workflow, get_workflow_by_id + from .utils.env_config import get_workflow_id + from .utils.logger_config import setup_system_logging, get_logger +except ImportError: + from schema_converter import convert_sql_params_to_input_schema + from utils.api_client import execute_workflow, get_workflow_by_id + from utils.env_config import get_workflow_id + from utils.logger_config import setup_system_logging, get_logger + +# 初始化日志系统 +setup_system_logging(app_name="lzwcai_workflow_to_mcp", log_level=logging.DEBUG) +logger = get_logger(__name__) + +# 初始化 MCP Server +server = Server("workflow_mcp_server") + +# 全局配置 +_tools_config: list[dict] = [] +_config: dict = {} + + +def parse_arguments(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description="Workflow MCP Server") + parser.add_argument( + "--mode", + type=str, + choices=["local", "api"], + default="api", + help="数据加载模式: local(本地JSON) 或 api(远程API,默认)" + ) + parser.add_argument( + "--json-path", + type=str, + default=None, + help="本地 JSON 文件路径 (local 模式)" + ) + parser.add_argument( + "--workflow-id", + type=str, + default=None, + help="工作流ID (api 模式,可选,默认从环境变量 workflowId 获取)" + ) + return parser.parse_args() + + +class DataLoader: + """数据加载器基类""" + + def load(self) -> list[dict]: + raise NotImplementedError + + +class LocalLoader(DataLoader): + """本地 JSON 文件加载器""" + + def __init__(self, json_path: str = None): + if json_path is None: + json_path = os.path.join(os.path.dirname(__file__), "businessQueries.json") + self.json_path = json_path + + def load(self) -> list[dict]: + try: + with open(self.json_path, "r", encoding="utf-8") as f: + data = json.load(f) + logger.info(f"从本地加载 {len(data)} 条配置: {self.json_path}") + return data + except FileNotFoundError: + logger.error(f"配置文件不存在: {self.json_path}") + return [] + except json.JSONDecodeError as e: + logger.error(f"JSON 解析错误: {e}") + return [] + + +class ApiLoader(DataLoader): + """API 远程加载器 - 使用 get_workflow_by_id 获取工作流配置""" + + def __init__(self, workflow_id: str = None): + self.workflow_id = workflow_id or get_workflow_id() + logger.debug(f"ApiLoader 初始化,工作流ID: {self.workflow_id}") + + def load(self) -> list[dict]: + try: + if not self.workflow_id: + logger.error("未提供工作流ID,请设置环境变量 workflowId") + return [] + + logger.info(f"开始从 API 加载工作流配置,工作流ID: {self.workflow_id}") + + # 调用 get_workflow_by_id 获取工作流配置 + response = get_workflow_by_id(self.workflow_id) + + logger.debug(f"API 响应原始数据: {json.dumps(response, ensure_ascii=False, indent=2)}") + + # 返回格式: {code, data, msg} + if response.get("code") != 200: + logger.error(f"获取工作流配置失败: code={response.get('code')}, msg={response.get('msg')}") + return [] + + data = response.get("data") + logger.debug(f"API 响应 data 字段: {json.dumps(data, ensure_ascii=False, indent=2) if data else 'None'}") + + # data 可能是单个对象或列表 + if isinstance(data, dict): + result = [data] + elif isinstance(data, list): + result = data + else: + logger.warning(f"API 响应 data 字段类型异常: {type(data)}") + result = [] + + logger.info(f"从 API 加载工作流配置成功,工作流ID: {self.workflow_id}, 配置数量: {len(result)}") + return result + + except Exception as e: + logger.error(f"API 请求失败: {e}", exc_info=True) + return [] + + +def create_loader(mode: str, **kwargs) -> DataLoader: + """ + 创建数据加载器 + + Args: + mode: "local" 或 "api" + kwargs: + - json_path: 本地 JSON 路径 (local 模式) + - workflow_id: 工作流ID (api 模式,可选,默认从环境变量获取) + """ + if mode == "local": + return LocalLoader(json_path=kwargs.get("json_path")) + elif mode == "api": + return ApiLoader(workflow_id=kwargs.get("workflow_id")) + else: + raise ValueError(f"不支持的模式: {mode}") + + +def init_tools(loader: DataLoader): + """初始化工具配置""" + global _tools_config + _tools_config = loader.load() + logger.info(f"已加载 {len(_tools_config)} 个工具配置") + + +@server.list_tools() +async def handle_list_tools() -> list[types.Tool]: + """列出所有可用工具""" + logger.info(f"收到 ListTools 请求,当前配置数量: {len(_tools_config)}") + tools = [] + + for query in _tools_config: + name = query.get("name", "") + description = query.get("description") or query.get("toolPrompt") or query.get("uniqueName", "") + sql_params = query.get("sqlParams", "[]") + + logger.debug(f"处理工具配置: name={name}, description={description[:50] if description else 'None'}...") + + # 转换参数为 inputSchema + input_schema = convert_sql_params_to_input_schema(sql_params) + logger.debug(f"工具 {name} 的 inputSchema: {json.dumps(input_schema, ensure_ascii=False)}") + + tools.append( + types.Tool( + name=name, + description=description, + inputSchema=input_schema + ) + ) + + logger.info(f"ListTools 响应: 返回 {len(tools)} 个工具") + return tools + + +@server.call_tool() +async def handle_call_tool( + name: str, + arguments: dict | None +) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: + """调用工具""" + logger.info(f"收到 CallTool 请求: name={name}, arguments={json.dumps(arguments, ensure_ascii=False) if arguments else 'None'}") + + # 查找对应的工具配置 + tool_config = None + for query in _tools_config: + if query.get("name") == name: + tool_config = query + break + + if tool_config is None: + logger.error(f"未找到工具配置: {name}") + raise ValueError(f"未知工具: {name}") + + logger.debug(f"找到工具配置: {json.dumps(tool_config, ensure_ascii=False, indent=2)}") + + # 获取工作流ID + workflow_id = tool_config.get("workflowId") or get_workflow_id() + logger.info(f"使用工作流ID: {workflow_id}") + + # 构建请求数据 + request_data = { + "workflowId": workflow_id, + "inputs": arguments or {} + } + + logger.info(f"执行工作流请求数据: {json.dumps(request_data, ensure_ascii=False, indent=2)}") + + try: + # 调用工作流执行API + result = execute_workflow(request_data) + logger.info(f"工作流执行成功: {workflow_id}") + logger.debug(f"工作流执行结果: {json.dumps(result, ensure_ascii=False, indent=2)}") + except Exception as e: + logger.error(f"工作流执行失败: {e}", exc_info=True) + result = { + "error": str(e), + "tool_name": name, + "workflow_id": workflow_id + } + + return [ + types.TextContent( + type="text", + text=json.dumps(result, ensure_ascii=False, indent=2) + ) + ] + + +async def run_server(): + """运行 MCP Server (stdio 模式)""" + async with stdio_server() as streams: + await server.run( + streams[0], + streams[1], + InitializationOptions( + server_name="workflow_mcp_server", + server_version="0.1.0", + capabilities=server.get_capabilities( + notification_options=NotificationOptions(), + experimental_capabilities={}, + ), + ), + ) + + +def main(): + """主入口""" + global _config + + logger.info("=" * 50) + logger.info("Workflow MCP Server 启动") + logger.info("=" * 50) + + # 解析命令行参数 + args = parse_arguments() + _config = vars(args) + logger.info(f"命令行参数: {_config}") + + # 创建加载器并初始化工具 + logger.info(f"使用模式: {args.mode}") + loader = create_loader( + mode=args.mode, + json_path=args.json_path, + workflow_id=args.workflow_id + ) + init_tools(loader) + + logger.info("开始运行 MCP Server (stdio 模式)") + # 运行服务器 + anyio.run(run_server) + + +if __name__ == "__main__": + main() diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/pyproject.toml b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/pyproject.toml new file mode 100644 index 0000000..1a485da --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "lzwcai-workflow-to-mcp" +version = "0.1.0" +description = "从 businessQueries 动态生成 MCP 工具" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "mcp>=1.0.0", + "anyio>=4.0.0", + "requests>=2.28.0", +] + +[project.scripts] +workflow-mcp = "main:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/schema_converter.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/schema_converter.py new file mode 100644 index 0000000..405489d --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/schema_converter.py @@ -0,0 +1,208 @@ +""" +Schema 转换器 +将 sqlParams 数组格式转换为 MCP 工具需要的 JSON Schema 格式 + +支持的类型: +- string: 文本输入 +- paragraph: 段落/多行文本 +- select: 下拉选项 +- number: 数字输入 +""" +import json +from typing import Any + + +def convert_param_to_schema_property(param: dict) -> tuple[str, dict, bool]: + """ + 将单个参数转换为 JSON Schema property + + Args: + param: 参数配置,格式如: + { + "type": "string", + "name": "company_names", + "displayName": "公司名称", + "maxLength": 200, + "defaultValue": "", + "required": true, + "options": ["选项1", "选项2"] # 仅 select 类型 + } + + Returns: + tuple: (property_name, property_schema, is_required) + """ + param_type = param.get("type", "string") + param_name = param.get("name", "") + display_name = param.get("displayName", param_name) + default_value = param.get("defaultValue", "") + max_length = param.get("maxLength") + is_required = param.get("required", False) + options = param.get("options", []) + + property_schema = { + "description": display_name + } + + if param_type == "string": + property_schema["type"] = "string" + if max_length: + property_schema["maxLength"] = max_length + + elif param_type == "paragraph": + property_schema["type"] = "string" + property_schema["format"] = "paragraph" + if max_length: + property_schema["maxLength"] = max_length + + elif param_type == "select": + property_schema["type"] = "string" + if options: + property_schema["enum"] = options + + elif param_type == "number": + property_schema["type"] = "number" + + else: + # 默认当作 string 处理 + property_schema["type"] = "string" + + # 添加默认值 + if default_value not in (None, ""): + if param_type == "number": + try: + property_schema["default"] = int(default_value) if str(default_value).isdigit() else float(default_value) + except (ValueError, TypeError): + property_schema["default"] = default_value + else: + property_schema["default"] = default_value + + return param_name, property_schema, is_required + + +def convert_sql_params_to_input_schema(sql_params: str | list) -> dict: + """ + 将 sqlParams 转换为 MCP 工具的 inputSchema + + Args: + sql_params: sqlParams 字段值,可以是 JSON 字符串或已解析的列表 + 格式: [{"type": "string", "name": "xxx", ...}, ...] + + Returns: + dict: MCP 工具的 inputSchema,格式如: + { + "type": "object", + "properties": {...}, + "required": [...] + } + """ + # 解析 JSON 字符串 + if isinstance(sql_params, str): + try: + params_list = json.loads(sql_params) + except json.JSONDecodeError: + return {"type": "object", "properties": {}, "required": []} + else: + params_list = sql_params + + if not isinstance(params_list, list): + return {"type": "object", "properties": {}, "required": []} + + input_schema = { + "type": "object", + "properties": {}, + "required": [] + } + + for param in params_list: + if not isinstance(param, dict): + continue + + name, schema, is_required = convert_param_to_schema_property(param) + + if name: + input_schema["properties"][name] = schema + if is_required: + input_schema["required"].append(name) + + return input_schema + + +def convert_business_query_to_tool(query: dict) -> dict: + """ + 将单个 businessQuery 转换为 MCP Tool 配置 + + Args: + query: businessQuery 对象 + + Returns: + dict: MCP Tool 配置 + { + "name": "工具名称", + "description": "工具描述", + "inputSchema": {...} + } + """ + name = query.get("name", "") + description = query.get("description") or query.get("toolPrompt") or query.get("uniqueName", "") + sql_params = query.get("sqlParams", "[]") + + input_schema = convert_sql_params_to_input_schema(sql_params) + + return { + "name": name, + "description": description, + "inputSchema": input_schema, + # 保留原始数据供后续使用 + "_raw": { + "id": query.get("id"), + "uniqueName": query.get("uniqueName"), + "sqlTemplate": query.get("sqlTemplate"), + "datasourceId": query.get("datasourceId"), + "toolType": query.get("toolType"), + } + } + + +def convert_all_queries_to_tools(queries: list[dict]) -> list[dict]: + """ + 将所有 businessQueries 转换为 MCP Tools 列表 + + Args: + queries: businessQueries 列表 + + Returns: + list: MCP Tools 配置列表 + """ + tools = [] + for query in queries: + tool = convert_business_query_to_tool(query) + if tool["name"]: + tools.append(tool) + return tools + + +# 测试用 +if __name__ == "__main__": + # 测试单个参数转换 + test_param = { + "type": "select", + "name": "subject_name", + "displayName": "科目名称", + "maxLength": 100, + "defaultValue": "", + "required": True, + "options": ["管理费用", "销售费用", "财务费用"] + } + + name, schema, required = convert_param_to_schema_property(test_param) + print(f"参数名: {name}") + print(f"Schema: {json.dumps(schema, ensure_ascii=False, indent=2)}") + print(f"必填: {required}") + print() + + # 测试完整 sqlParams 转换 + test_sql_params = '[{"type":"paragraph","name":"digest","displayName":"摘要关键字","maxLength":500,"defaultValue":"","required":true},{"type":"string","name":"company_names","displayName":"公司名称","maxLength":200,"defaultValue":"","required":false},{"type":"number","name":"year","displayName":"会计年度","maxLength":4,"defaultValue":"2025","required":true}]' + + input_schema = convert_sql_params_to_input_schema(test_sql_params) + print("InputSchema:") + print(json.dumps(input_schema, ensure_ascii=False, indent=2)) diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__init__.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__init__.py new file mode 100644 index 0000000..53da55d --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__init__.py @@ -0,0 +1,33 @@ +"""Utils package for lzwcai_workflow_to_mcp""" + +from .json_helper import load_json +from .name_helper import generate_tool_name +from .schema_helper import generate_input_schema, validate_input_schema +from .api_client import ( + WorkflowAPIClient, + get_workflow_by_id, + execute_workflow, + process_workflow_response, + get_default_client, + DEFAULT_TIMEOUT, + WORKFLOW_EXECUTE_TIMEOUT +) +from .env_config import get_workflow_id, get_backend_base_url, get_env_config, set_env_variable + +__all__ = [ + 'load_json', + 'generate_tool_name', + 'generate_input_schema', + 'validate_input_schema', + 'WorkflowAPIClient', + 'get_workflow_by_id', + 'execute_workflow', + 'process_workflow_response', + 'get_default_client', + 'DEFAULT_TIMEOUT', + 'WORKFLOW_EXECUTE_TIMEOUT', + 'get_workflow_id', + 'get_backend_base_url', + 'get_env_config', + 'set_env_variable' +] diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/__init__.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..a94cf01 Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/__init__.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/api_client.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/api_client.cpython-312.pyc new file mode 100644 index 0000000..87ce3b9 Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/api_client.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/env_config.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/env_config.cpython-312.pyc new file mode 100644 index 0000000..0c048b2 Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/env_config.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/json_helper.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/json_helper.cpython-312.pyc new file mode 100644 index 0000000..8aad326 Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/json_helper.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/logger_config.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/logger_config.cpython-312.pyc new file mode 100644 index 0000000..1da07dc Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/logger_config.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/name_helper.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/name_helper.cpython-312.pyc new file mode 100644 index 0000000..b61c36d Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/name_helper.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/schema_helper.cpython-312.pyc b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/schema_helper.cpython-312.pyc new file mode 100644 index 0000000..9eb072d Binary files /dev/null and b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/__pycache__/schema_helper.cpython-312.pyc differ diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/api_client.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/api_client.py new file mode 100644 index 0000000..ef72edb --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/api_client.py @@ -0,0 +1,307 @@ +""" +工作流API调用客户端 +用于调用工作流管理和执行接口 +""" + +import httpx +import json +from typing import Dict, Any, Optional, List + +try: + from .env_config import get_backend_base_url, get_workflow_execute_key + from .logger_config import get_logger +except ImportError: + from env_config import get_backend_base_url, get_workflow_execute_key + from logger_config import get_logger + +logger = get_logger(__name__) + +# 默认超时配置(秒) +DEFAULT_TIMEOUT = 30.0 # 普通请求超时 +WORKFLOW_EXECUTE_TIMEOUT = 300.0 # 工作流执行超时(5分钟) + + +class WorkflowAPIClient: + """工作流API客户端""" + + def __init__( + self, + base_url: Optional[str] = None, + token: Optional[str] = None, + default_timeout: float = DEFAULT_TIMEOUT, + execute_timeout: float = WORKFLOW_EXECUTE_TIMEOUT + ): + """ + 初始化API客户端 + + Args: + base_url: API基础URL(默认从环境变量 backendBaseUrl 读取) + token: 认证令牌(默认从环境变量 workflowExecuteKey 读取) + default_timeout: 普通请求超时时间(秒),默认30秒 + execute_timeout: 工作流执行超时时间(秒),默认300秒(5分钟) + """ + if base_url is None: + base_url = get_backend_base_url() + + if token is None: + token = get_workflow_execute_key() + + self.base_url = base_url.rstrip('/') + self.token = token + self.default_timeout = default_timeout + self.execute_timeout = execute_timeout + self._client: Optional[httpx.Client] = None + + logger.debug(f"WorkflowAPIClient 初始化: base_url={self.base_url}, token={'*' * 10 if self.token else 'None'}, execute_timeout={self.execute_timeout}s") + + @property + def client(self) -> httpx.Client: + """懒加载 HTTP 客户端""" + if self._client is None: + self._client = httpx.Client(timeout=self.default_timeout) + return self._client + + def _get_headers(self) -> Dict[str, str]: + """获取请求头""" + return { + 'X-API-Key': f'Bearer {self.token}', + } + + def get_workflow_by_id(self, workflow_id: str) -> Dict[str, Any]: + """ + 根据工作流ID获取工作流信息 + + Args: + workflow_id: 工作流ID + + Returns: + API响应数据 + + Raises: + Exception: 请求失败时抛出 + """ + url = f"{self.base_url}/system/workflowManage/getByWorkflowId/{workflow_id}" + + try: + logger.info(f"[API请求] GET {url}") + logger.debug(f"[API请求] Headers: {self._get_headers()}") + + response = self.client.get( + url, + headers=self._get_headers() + ) + + logger.info(f"[API响应] HTTP {response.status_code}") + logger.debug(f"[API响应] Headers: {dict(response.headers)}") + + response.raise_for_status() + data = response.json() + + logger.info(f"[API响应] 获取工作流配置成功: workflow_id={workflow_id}") + logger.debug(f"[API响应] Body: {json.dumps(data, ensure_ascii=False, indent=2)}") + + return data + + except httpx.TimeoutException: + error_msg = f"API请求超时: {url}" + logger.error(f"[API错误] {error_msg}") + raise Exception(error_msg) + + except httpx.HTTPStatusError as e: + error_msg = f"API请求失败 (HTTP {e.response.status_code}): {url}" + logger.error(f"[API错误] {error_msg}") + logger.error(f"[API错误] 响应内容: {e.response.text}") + raise Exception(error_msg) + + except httpx.RequestError as e: + error_msg = f"API请求异常: {url}, 错误: {str(e)}" + logger.error(f"[API错误] {error_msg}") + raise Exception(error_msg) + + except Exception as e: + error_msg = f"处理API响应时出错: {str(e)}" + logger.error(f"[API错误] {error_msg}", exc_info=True) + raise Exception(error_msg) + + def execute_workflow(self, request_data: Dict[str, Any]) -> Dict[str, Any]: + """ + 执行工作流 + + Args: + request_data: 请求数据,包含工作流执行所需的参数 + + Returns: + API响应数据 + + Raises: + Exception: 请求失败时抛出 + """ + url = f"{self.base_url}/open/workflow/execute" + + try: + headers = self._get_headers() + headers['Content-Type'] = 'application/json' + headers['Accept'] = '*/*' + + logger.info(f"[API请求] POST {url} (超时: {self.execute_timeout}s)") + logger.debug(f"[API请求] Headers: {headers}") + logger.debug(f"[API请求] Body: {json.dumps(request_data, ensure_ascii=False, indent=2)}") + + # 使用更长的超时时间执行工作流 + response = self.client.post( + url, + headers=headers, + json=request_data, + timeout=self.execute_timeout # 使用工作流执行专用超时时间 + ) + + logger.info(f"[API响应] HTTP {response.status_code}") + logger.debug(f"[API响应] Headers: {dict(response.headers)}") + + response.raise_for_status() + data = response.json() + + logger.info(f"[API响应] 执行工作流成功") + logger.debug(f"[API响应] Body: {json.dumps(data, ensure_ascii=False, indent=2)}") + + return data + + except httpx.TimeoutException: + error_msg = f"执行工作流API请求超时: {url}" + logger.error(f"[API错误] {error_msg}") + raise Exception(error_msg) + + except httpx.HTTPStatusError as e: + error_msg = f"执行工作流API请求失败 (HTTP {e.response.status_code}): {url}" + logger.error(f"[API错误] {error_msg}") + logger.error(f"[API错误] 响应内容: {e.response.text}") + raise Exception(error_msg) + + except httpx.RequestError as e: + error_msg = f"执行工作流API请求异常: {url}, 错误: {str(e)}" + logger.error(f"[API错误] {error_msg}") + raise Exception(error_msg) + + except Exception as e: + error_msg = f"处理执行工作流API响应时出错: {str(e)}" + logger.error(f"[API错误] {error_msg}", exc_info=True) + raise Exception(error_msg) + + def close(self): + """关闭HTTP客户端""" + if self._client is not None: + self._client.close() + self._client = None + + def __enter__(self): + """支持 context manager""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """退出时关闭客户端""" + self.close() + return False + + +# 懒加载的默认客户端 +_default_client: Optional[WorkflowAPIClient] = None + + +def get_default_client() -> WorkflowAPIClient: + """获取默认客户端(懒加载)""" + global _default_client + if _default_client is None: + _default_client = WorkflowAPIClient() + return _default_client + + +def get_workflow_by_id(workflow_id: str, base_url: Optional[str] = None, token: Optional[str] = None) -> Dict[str, Any]: + """ + 便捷函数:根据工作流ID获取工作流信息 + + Args: + workflow_id: 工作流ID + base_url: API基础URL(可选) + token: 认证令牌(可选) + + Returns: + API响应数据 + """ + if base_url or token: + with WorkflowAPIClient(base_url=base_url, token=token) as client: + return client.get_workflow_by_id(workflow_id) + else: + return get_default_client().get_workflow_by_id(workflow_id) + + +def execute_workflow( + request_data: Dict[str, Any], + base_url: Optional[str] = None, + token: Optional[str] = None, + timeout: float = WORKFLOW_EXECUTE_TIMEOUT +) -> Dict[str, Any]: + """ + 便捷函数:执行工作流 + + Args: + request_data: 请求数据 + base_url: API基础URL(可选) + token: 认证令牌(可选) + timeout: 超时时间(秒),默认300秒(5分钟) + + Returns: + API响应数据 + """ + if base_url or token: + with WorkflowAPIClient(base_url=base_url, token=token, execute_timeout=timeout) as client: + return client.execute_workflow(request_data) + else: + # 更新默认客户端的超时时间 + default_client = get_default_client() + default_client.execute_timeout = timeout + return default_client.execute_workflow(request_data) + + +def process_workflow_response(response: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + 处理API响应数据,映射为工作流配置格式 + + Args: + response: API原始响应数据 + + Returns: + 处理后的工作流配置列表 + """ + try: + data_list = response.get("data", []) + + # 如果 data 是单个对象而不是列表,转换为列表 + if isinstance(data_list, dict): + data_list = [data_list] + + workflows = [] + for workflow in data_list: + # 解析 inputParams 字符串为 JSON 对象 + input_params = workflow.get("inputParams", {}) + if isinstance(input_params, str): + try: + input_params = json.loads(input_params) + except json.JSONDecodeError: + input_params = {} + + # 映射字段 + config = { + "id": workflow.get("id"), + "workflowId": workflow.get("workflowId"), + "workflowName": workflow.get("workflowName") or workflow.get("name"), + "workflowDescription": workflow.get("workflowDescription") or workflow.get("description"), + "inputParams": input_params + } + workflows.append(config) + + logger.info(f"成功处理 {len(workflows)} 条工作流数据") + return workflows + + except Exception as e: + logger.error(f"处理API响应数据失败: {e}", exc_info=True) + raise diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/env_config.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/env_config.py new file mode 100644 index 0000000..eac8f35 --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/env_config.py @@ -0,0 +1,85 @@ +"""环境变量配置模块""" + +import os +from typing import Optional + + +def get_workflow_id(default: str = "") -> str: + """ + 获取工作流ID环境变量 + + Args: + default: 默认值(默认为 "") + + Returns: + str: 工作流ID + + Environment Variables: + workflowId: 工作流ID + """ + return os.environ.get("workflowId", default) + + +def get_backend_base_url(default: str = "http://lzwcai-demp-corp-manager:8086") -> str: + """ + 获取后端API基础URL环境变量 + + Args: + default: 默认值(默认为 "http://lzwcai-demp-corp-manager:8086") + + Returns: + str: 后端API基础URL + + Environment Variables: + backendBaseUrl: 后端API基础URL + """ + return os.environ.get("backendBaseUrl", default) + + +def get_workflow_execute_key(default: str = "") -> str: + """ + 获取工作流执行密钥(Token)环境变量 + + Args: + default: 默认值(默认为 "") + + Returns: + str: 工作流执行密钥 + + Environment Variables: + workflowExecuteKey: 工作流执行密钥/Token + """ + return os.environ.get("workflowExecuteKey", default) + + +def get_env_config() -> dict: + """ + 获取所有环境配置 + + Returns: + dict: 包含所有配置的字典 + + Example: + config = get_env_config() + print(config['workflow_id']) # 输出: "" + print(config['backend_base_url']) # 输出: "http://lzwcai-demp-corp-manager:8086" + """ + return { + "workflow_id": get_workflow_id(), + "backend_base_url": get_backend_base_url(), + "workflow_execute_key": get_workflow_execute_key() + } + + +def set_env_variable(key: str, value: str) -> None: + """ + 设置环境变量(仅在当前进程中有效) + + Args: + key: 环境变量名 + value: 环境变量值 + + Example: + set_env_variable("workflowId", "1234567890") + """ + os.environ[key] = value diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/json_helper.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/json_helper.py new file mode 100644 index 0000000..cd6894e --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/json_helper.py @@ -0,0 +1,59 @@ +"""JSON 文件读取工具""" + +import json +from pathlib import Path +from typing import Any, Union + + +def load_json(json_path: Union[str, Path]) -> Any: + """ + 读取 JSON 文件并返回其内容 + + Args: + json_path: JSON 文件的路径(支持字符串或 Path 对象) + + Returns: + JSON 文件中解析后的数据(可以是字典、列表或其他 JSON 类型) + + Raises: + FileNotFoundError: 当文件不存在时 + json.JSONDecodeError: 当 JSON 格式无效时 + Exception: 其他读取错误 + + Example: + >>> data = load_json('config.json') + >>> print(data) + {'key': 'value'} + + >>> data = load_json(Path('data/users.json')) + >>> print(data) + [{'id': 1, 'name': 'Alice'}] + """ + try: + # 转换为 Path 对象 + path = Path(json_path) + + # 检查文件是否存在 + if not path.exists(): + raise FileNotFoundError(f"JSON 文件不存在: {json_path}") + + # 检查是否为文件 + if not path.is_file(): + raise ValueError(f"路径不是一个文件: {json_path}") + + # 读取并解析 JSON 文件 + with open(path, 'r', encoding='utf-8') as f: + data = json.load(f) + + return data + + except json.JSONDecodeError as e: + raise json.JSONDecodeError( + f"JSON 格式错误: {e.msg}", + e.doc, + e.pos + ) + except FileNotFoundError: + raise + except Exception as e: + raise Exception(f"读取 JSON 文件时发生错误: {str(e)}") diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/logger_config.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/logger_config.py new file mode 100644 index 0000000..38efa16 --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/logger_config.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +""" +统一日志配置模块 +提供系统级别的日志配置和管理 +""" + +import os +import sys +import logging +from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler +from datetime import datetime +from pathlib import Path + + +class LoggerConfig: + """日志配置管理类""" + + def __init__(self, logs_dir: str = None): + """初始化日志配置 + + Args: + logs_dir: 日志目录路径,默认为项目根目录下的logs文件夹 + """ + if logs_dir: + self.logs_dir = Path(logs_dir) + else: + project_root = Path(__file__).parent.parent + self.logs_dir = project_root / "logs" + + self.logs_dir.mkdir(exist_ok=True) + + self.log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' + self.date_format = '%Y-%m-%d %H:%M:%S' + self.log_level = self._get_log_level_from_env() + self._initialized = False + + def _get_log_level_from_env(self) -> int: + log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper() + level_mapping = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'WARNING': logging.WARNING, + 'WARN': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL, + 'FATAL': logging.CRITICAL + } + return level_mapping.get(log_level_str, logging.INFO) + + def setup_logging(self, + app_name: str = "lzwcai_workflow_to_mcp", + log_level: int = logging.INFO, + max_file_size: int = 10 * 1024 * 1024, + backup_count: int = 5, + console_output: bool = True) -> logging.Logger: + if self._initialized: + return logging.getLogger() + + root_logger = logging.getLogger() + root_logger.setLevel(log_level) + + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + formatter = logging.Formatter(self.log_format, self.date_format) + + # 1. 主日志文件 - 按大小滚动 + main_log_file = self.logs_dir / f"{app_name}.log" + file_handler = RotatingFileHandler( + main_log_file, + maxBytes=max_file_size, + backupCount=backup_count, + encoding='utf-8' + ) + file_handler.setLevel(log_level) + file_handler.setFormatter(formatter) + root_logger.addHandler(file_handler) + + # 2. 错误日志文件 + error_log_file = self.logs_dir / f"{app_name}_error.log" + error_handler = RotatingFileHandler( + error_log_file, + maxBytes=max_file_size, + backupCount=backup_count, + encoding='utf-8' + ) + error_handler.setLevel(logging.ERROR) + error_handler.setFormatter(formatter) + root_logger.addHandler(error_handler) + + # 3. 按日期滚动的日志文件 + daily_log_file = self.logs_dir / f"{app_name}_daily.log" + daily_handler = TimedRotatingFileHandler( + daily_log_file, + when='midnight', + interval=1, + backupCount=30, + encoding='utf-8' + ) + daily_handler.setLevel(log_level) + daily_handler.setFormatter(formatter) + daily_handler.suffix = "%Y-%m-%d" + root_logger.addHandler(daily_handler) + + # 4. 控制台输出 (MCP协议使用stdio时,必须将日志输出到stderr) + if console_output: + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setLevel(log_level) + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + self.date_format + ) + console_handler.setFormatter(console_formatter) + root_logger.addHandler(console_handler) + + self._initialized = True + root_logger.info(f"日志系统初始化完成 - 日志目录: {self.logs_dir}") + + return root_logger + + def get_module_logger(self, module_name: str) -> logging.Logger: + return logging.getLogger(module_name) + + def create_component_logger(self, + component_name: str, + log_file: str = None, + log_level: int = None) -> logging.Logger: + logger = logging.getLogger(component_name) + + if log_file: + component_log_file = self.logs_dir / log_file + handler = RotatingFileHandler( + component_log_file, + maxBytes=5 * 1024 * 1024, + backupCount=3, + encoding='utf-8' + ) + + formatter = logging.Formatter(self.log_format, self.date_format) + handler.setFormatter(formatter) + + if log_level: + handler.setLevel(log_level) + + logger.addHandler(handler) + + return logger + + def setup_mcp_logging(self) -> logging.Logger: + return self.create_component_logger( + "mcp_services", + "mcp_services.log", + logging.DEBUG + ) + + def setup_api_logging(self) -> logging.Logger: + return self.create_component_logger( + "api_requests", + "api_requests.log", + logging.INFO + ) + + +# 全局日志配置实例 +logger_config = LoggerConfig() + + +def setup_system_logging(app_name: str = "lzwcai_workflow_to_mcp", + log_level: int = logging.INFO) -> logging.Logger: + return logger_config.setup_logging(app_name, log_level) + + +def get_logger(name: str) -> logging.Logger: + return logger_config.get_module_logger(name) diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/name_helper.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/name_helper.py new file mode 100644 index 0000000..7a5499d --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/name_helper.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +""" +名称生成工具模块 +""" + +from pypinyin import lazy_pinyin, Style +import logging + +logger = logging.getLogger(__name__) + + +def generate_tool_name(business_name: str, tool_id: str) -> str: + """ + 根据业务名称和ID生成工具名称 + 格式: tool_拼音_id + + Args: + business_name: 业务名称(中文) + tool_id: 工具ID + + Returns: + str: 格式化的工具名称 + """ + try: + # 将中文转换为拼音(无音调,小写) + pinyin_list = lazy_pinyin(business_name, style=Style.NORMAL) + # 拼接拼音 + pinyin_str = ''.join(pinyin_list) + + # 将 ID 中的 '-' 替换为 '_' + formatted_id = tool_id.replace('-', '_') + + # 组合成最终的工具名称 + tool_name = f"workflow_{pinyin_str}_{formatted_id}" + + return tool_name + except Exception as e: + logger.error(f"生成工具名称失败: {business_name}, {tool_id}, 错误: {e}", exc_info=True) + # 降级处理:如果拼音转换失败,使用 ID + return f"workflow_{tool_id.replace('-', '_')}" diff --git a/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/schema_helper.py b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/schema_helper.py new file mode 100644 index 0000000..ee92867 --- /dev/null +++ b/lzwcai_workflow_to_mcp/lzwcai_workflow_to_mcp/utils/schema_helper.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +""" +Schema 生成工具模块 +""" + +from typing import Any, Dict, List + + +def generate_input_schema(parameters: Dict[str, Any]) -> Dict[str, Any]: + """ + 从查询配置的参数定义生成 MCP 工具的 inputSchema + + 此函数会保留完整的 JSON Schema 信息,包括: + - type: Schema 类型(通常是 "object") + - required: 必填字段列表 + - properties: 属性定义(包括每个属性的 type, description, format, examples 等) + - description: Schema 的整体描述(如果有) + - 以及其他任何 JSON Schema 标准字段 + + Args: + parameters: 查询配置中的参数定义字典,应该是一个完整的 JSON Schema 对象 + + Returns: + Dict[str, Any]: 符合 JSON Schema 规范的 inputSchema 对象 + """ + # 如果 parameters 本身就是一个完整的 JSON Schema 对象,直接使用 + # 但确保至少包含 type 和 properties + if not parameters: + # 如果 parameters 为空,返回一个空的 object schema + return { + "type": "object", + "properties": {}, + "required": [] + } + + # 深拷贝 parameters 以避免修改原始数据 + input_schema = dict(parameters) + + # 确保必需的字段存在 + if "type" not in input_schema: + input_schema["type"] = "object" + + if "properties" not in input_schema: + input_schema["properties"] = {} + + if "required" not in input_schema: + input_schema["required"] = [] + + return input_schema + + +def validate_input_schema(schema: Dict[str, Any]) -> tuple[bool, str]: + """ + 验证 inputSchema 是否符合基本的 JSON Schema 规范 + + Args: + schema: 要验证的 schema 对象 + + Returns: + tuple[bool, str]: (是否有效, 错误消息或成功消息) + """ + if not isinstance(schema, dict): + return False, "Schema 必须是一个字典对象" + + if schema.get("type") != "object": + return False, "Schema 的 type 字段必须是 'object'" + + if "properties" not in schema: + return False, "Schema 必须包含 properties 字段" + + if not isinstance(schema.get("properties"), dict): + return False, "Schema 的 properties 字段必须是一个字典对象" + + # 验证 required 字段(如果存在) + if "required" in schema: + required = schema["required"] + if not isinstance(required, list): + return False, "Schema 的 required 字段必须是一个列表" + + # 验证所有 required 的字段都在 properties 中定义 + properties = schema["properties"] + for field in required: + if field not in properties: + return False, f"必填字段 '{field}' 未在 properties 中定义" + + # 验证 properties 中每个字段的定义 + for prop_name, prop_def in schema["properties"].items(): + if not isinstance(prop_def, dict): + return False, f"属性 '{prop_name}' 的定义必须是一个字典对象" + + if "type" not in prop_def: + return False, f"属性 '{prop_name}' 必须包含 type 字段" + + return True, "Schema 验证通过" diff --git a/lzwcai_workflow_to_mcp/main.py b/lzwcai_workflow_to_mcp/main.py new file mode 100644 index 0000000..80107e5 --- /dev/null +++ b/lzwcai_workflow_to_mcp/main.py @@ -0,0 +1,16 @@ +""" +Entry point for lzwcai-workflow-to-mcp +Runs the MCP server for workflow execution +""" + +import os + +if __name__ == "__main__": + # 设置环境变量 + os.environ["workflowId"] = "2002300699510763521" + os.environ["workflowExecuteKey"] = "wf_buh230o9iaea4n6aefsddcexa7p27ydl" + os.environ["backendBaseUrl"] = "http://192.168.2.236:8088" + + # Import and run the actual MCP server + from lzwcai_workflow_to_mcp.main import main + main() diff --git a/lzwcai_workflow_to_mcp/pyproject.toml b/lzwcai_workflow_to_mcp/pyproject.toml new file mode 100644 index 0000000..3dd7ec5 --- /dev/null +++ b/lzwcai_workflow_to_mcp/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "lzwcai-workflow-to-mcp" +version = "0.1.8" +description = "MCP server for executing business SQL queries with dynamic tool generation" +readme = "README.md" +requires-python = ">=3.13" +license = {text = "MIT"} +authors = [ + {name = "lzwcai", email = "your-email@example.com"}, +] +keywords = ["mcp", "sql", "executor", "server"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "httpx>=0.28.1", + "mcp[cli]>=1.10.1", + "pypinyin>=0.53.0", +] + +[project.scripts] +lzwcai-workflow-to-mcp = "lzwcai_workflow_to_mcp.main:main" + +[tool.hatch.build.targets.wheel] +packages = ["lzwcai_workflow_to_mcp"] + +[tool.hatch.build.targets.wheel.force-include] +"lzwcai_workflow_to_mcp/businessQueries.json" = "lzwcai_workflow_to_mcp/businessQueries.json"