feat(lzwcai-workflow-to-mcp): 初始化项目并实现动态 MCP 工具生成功能

- 添加 Python 3.13 版本声明文件 `.python-version`
- 新增 `businessQueries.json`,包含三个 SQL 查询配置:按摘要、科目查凭证及科目余额
- 实现主程序 `main.py` 支持 local 和 api 两种模式加载配置,并注册为 MCP Server 工具
- 创建 `schema_converter.py` 用于将 sqlParams 转换为 MCP 输入 schema
- 配置 `pyproject.toml` 定义项目元数据与依赖(mcp, anyio, requests)
- 引入 utils 包结构,支持 API 请求、环境变量读取、日志配置等辅助功能模块
- 提供完整的工具列表和调用逻辑,通过 stdio 与 MCP 客户端通信执行工作流
This commit is contained in:
2025-12-20 17:49:45 +08:00
parent ec7e7fd7dc
commit 3ee23fe2c4
25 changed files with 1446 additions and 0 deletions

View File

@@ -0,0 +1 @@
3.13

View File

@@ -0,0 +1,80 @@
[
{
"id": "2000851983567065089",
"createBy": "bjt",
"createTime": "2025-12-16 16:54:22",
"updateBy": null,
"updateTime": null,
"serviceId": "2000821852159709186",
"uniqueName": "按摘要查凭证",
"name": "anzhaiyaochapingzheng_91da19f8",
"description": "按照摘要描述查询凭证信息",
"visualizable": 1,
"toolPrompt": "按照摘要描述查询凭证信息",
"toolType": "sql",
"datasourceId": "158",
"sqlTemplate": "WITH query_params AS (\n SELECT \n {year}::INT AS target_year,\n '{digest}' AS digest_keyword,\n '{company_names}' AS company_names_param,\n '{months}' AS months_param\n),\ncompany_filters AS (\n SELECT \n TRIM(keyword) AS company_keyword\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT company_names_param FROM query_params LIMIT 1), ''), ',')) AS keyword\n ) t\n WHERE keyword IS NOT NULL AND TRIM(keyword) <> ''\n),\nmonth_filters AS (\n SELECT \n month::INT AS target_month\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT months_param FROM query_params LIMIT 1), ''), ',')) AS month\n ) t\n WHERE month IS NOT NULL AND TRIM(month) <> ''\n)\n\nSELECT \n c.company_name AS 公司,\n v.dbill_date AS 日期,\n v.csign || LPAD(v.ino_id::text, 4, '0') AS 凭证号,\n v.cdigest AS 摘要,\n a.ccode AS 科目编码,\n a.ccode_name AS 科目名称,\n v.md AS 借方,\n v.mc AS 贷方,\n v.cbill AS 制单人\nFROM uf_voucher v\nJOIN uf_company c ON v.company_id = c.id\nJOIN uf_account_code a ON v.account_code_id = a.id\nCROSS JOIN query_params qp\nWHERE v.bdelete = false\n AND v.iyear = qp.target_year\n AND v.cdigest LIKE '%' || qp.digest_keyword || '%'\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n AND (\n NOT EXISTS (SELECT 1 FROM month_filters)\n OR v.iperiod IN (SELECT target_month FROM month_filters)\n )\nORDER BY c.company_name, v.dbill_date DESC, v.ino_id;",
"sqlParams": "[{\"type\":\"paragraph\",\"name\":\"digest\",\"displayName\":\"摘要关键字\",\"maxLength\":500,\"defaultValue\":\"\",\"required\":true},{\"type\":\"string\",\"name\":\"company_names\",\"displayName\":\"公司名称\",\"maxLength\":200,\"defaultValue\":\"\",\"required\":false},{\"type\":\"number\",\"name\":\"year\",\"displayName\":\"会计年度\",\"maxLength\":4,\"defaultValue\":\"2025\",\"required\":true},{\"type\":\"string\",\"name\":\"months\",\"displayName\":\"月份\",\"maxLength\":50,\"defaultValue\":\"\",\"required\":false}]",
"resultType": "list",
"sourceType": "ai",
"trainingTaskId": null,
"tableMetadataIds": "",
"executionCount": 0,
"visualizationConfigs": [],
"inputJsonSchema": "{}",
"outputJsonSchema": "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"array\"}}}",
"lastExecutionTime": null
},
{
"id": "2000852304343240706",
"createBy": "bjt",
"createTime": "2025-12-16 16:55:38",
"updateBy": null,
"updateTime": null,
"serviceId": "2000821852159709186",
"uniqueName": "按科目查凭证",
"name": "ankemuchapingzheng_6a268ea1",
"description": "按照科目名称查询凭证信息",
"visualizable": 1,
"toolPrompt": "按照科目名称查询凭证信息",
"toolType": "sql",
"datasourceId": "158",
"sqlTemplate": "WITH query_params AS (\n SELECT \n {year}::INT AS target_year,\n '{company_names}' AS company_names_param,\n '{subject_name}' AS subject_keyword,\n '{months}' AS months_param\n),\ncompany_filters AS (\n SELECT \n TRIM(keyword) AS company_keyword\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT company_names_param FROM query_params LIMIT 1), ''), ',')) AS keyword\n ) t\n WHERE keyword IS NOT NULL AND TRIM(keyword) <> ''\n),\nmonth_filters AS (\n SELECT \n month::INT AS target_month\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT months_param FROM query_params LIMIT 1), ''), ',')) AS month\n ) t\n WHERE month IS NOT NULL AND TRIM(month) <> ''\n),\n目标科目 AS (\n SELECT DISTINCT a.id, a.ccode\n FROM uf_account_code a\n JOIN uf_company c ON a.company_id = c.id\n CROSS JOIN query_params qp\n WHERE a.ccode_name LIKE '%' || qp.subject_keyword || '%'\n AND a.iyear = qp.target_year\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n),\n相关科目 AS (\n SELECT a.id\n FROM uf_account_code a\n JOIN uf_company c ON a.company_id = c.id\n CROSS JOIN query_params qp\n WHERE a.iyear = qp.target_year\n AND EXISTS (\n SELECT 1 FROM 目标科目 t \n WHERE a.ccode LIKE t.ccode || '%'\n )\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n)\nSELECT \n c.company_name AS 公司,\n v.dbill_date AS 日期,\n v.csign || LPAD(v.ino_id::text, 4, '0') AS 凭证号,\n v.cdigest AS 摘要,\n a.ccode AS 科目编码,\n a.ccode_name AS 科目名称,\n a.igrade AS 级次,\n v.md AS 借方,\n v.mc AS 贷方\nFROM uf_voucher v\nJOIN uf_company c ON v.company_id = c.id\nJOIN uf_account_code a ON v.account_code_id = a.id\nCROSS JOIN query_params qp\nWHERE v.bdelete = false\n AND v.account_code_id IN (SELECT id FROM 相关科目)\n AND v.iyear = qp.target_year\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n AND (\n NOT EXISTS (SELECT 1 FROM month_filters)\n OR v.iperiod IN (SELECT target_month FROM month_filters)\n )\nORDER BY c.company_name, a.ccode, v.dbill_date, v.ino_id;",
"sqlParams": "[{\"type\":\"select\",\"name\":\"subject_name\",\"displayName\":\"科目名称\",\"maxLength\":100,\"defaultValue\":\"\",\"required\":true,\"options\":[\"管理费用\",\"销售费用\",\"财务费用\",\"应收账款\",\"应付账款\"]},{\"type\":\"paragraph\",\"name\":\"company_names\",\"displayName\":\"公司名称列表\",\"maxLength\":300,\"defaultValue\":\"\",\"required\":false},{\"type\":\"number\",\"name\":\"year\",\"displayName\":\"年度\",\"maxLength\":4,\"defaultValue\":\"2025\",\"required\":true},{\"type\":\"select\",\"name\":\"months\",\"displayName\":\"期间月份\",\"maxLength\":30,\"defaultValue\":\"\",\"required\":false,\"options\":[\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\",\"8\",\"9\",\"10\",\"11\",\"12\"]}]",
"resultType": "list",
"sourceType": "ai",
"trainingTaskId": null,
"tableMetadataIds": "",
"executionCount": 0,
"visualizationConfigs": [],
"inputJsonSchema": "{}",
"outputJsonSchema": "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"array\"}}}",
"lastExecutionTime": null
},
{
"id": "2000852610904920066",
"createBy": "bjt",
"createTime": "2025-12-16 16:56:51",
"updateBy": null,
"updateTime": null,
"serviceId": "2000821852159709186",
"uniqueName": "按科目查余额",
"name": "ankemuchayue_68b302c2",
"description": "按照科目名称查询余额",
"visualizable": 1,
"toolPrompt": "按照科目名称查询余额",
"toolType": "sql",
"datasourceId": "158",
"sqlTemplate": "WITH query_params AS (\n SELECT \n {year}::INT AS target_year,\n '{company_names}' AS company_names_param,\n '{subject_name}' AS subject_keyword,\n '{months}' AS months_param\n),\ncompany_filters AS (\n SELECT \n TRIM(keyword) AS company_keyword\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT company_names_param FROM query_params LIMIT 1), ''), ',')) AS keyword\n ) t\n WHERE keyword IS NOT NULL AND TRIM(keyword) <> ''\n),\nmonth_filters AS (\n SELECT \n month::INT AS target_month\n FROM (\n SELECT unnest(string_to_array(nullif((SELECT months_param FROM query_params LIMIT 1), ''), ',')) AS month\n ) t\n WHERE month IS NOT NULL AND TRIM(month) <> ''\n),\n目标科目 AS (\n SELECT DISTINCT a.id, a.ccode\n FROM uf_account_code a\n JOIN uf_company c ON a.company_id = c.id\n CROSS JOIN query_params qp\n WHERE a.ccode_name LIKE '%' || qp.subject_keyword || '%'\n AND a.iyear = qp.target_year\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n),\n相关科目 AS (\n SELECT a.id\n FROM uf_account_code a\n JOIN uf_company c ON a.company_id = c.id\n CROSS JOIN query_params qp\n WHERE a.iyear = qp.target_year\n AND EXISTS (\n SELECT 1 FROM 目标科目 t \n WHERE a.ccode LIKE t.ccode || '%'\n )\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n)\nSELECT \n c.company_name AS 公司,\n a.ccode AS 科目编码,\n a.ccode_name AS 科目名称,\n a.igrade AS 级次,\n a.cclass AS 类别,\n b.iyear AS 年,\n b.iperiod AS 月,\n b.cbegind_c AS 期初方向,\n b.mb AS 期初余额,\n b.md AS 借方发生,\n b.mc AS 贷方发生,\n b.cendd_c AS 期末方向,\n b.me AS 期末余额\nFROM uf_account_balance b\nJOIN uf_company c ON b.company_id = c.id\nJOIN uf_account_code a ON b.account_code_id = a.id\nCROSS JOIN query_params qp\nWHERE b.iyear = qp.target_year\n AND b.account_code_id IN (SELECT id FROM 相关科目)\n AND (\n NOT EXISTS (SELECT 1 FROM company_filters)\n OR EXISTS (\n SELECT 1 FROM company_filters cf \n WHERE c.company_name LIKE '%' || cf.company_keyword || '%'\n )\n )\n AND (\n NOT EXISTS (SELECT 1 FROM month_filters)\n OR b.iperiod IN (SELECT target_month FROM month_filters)\n )\nORDER BY c.company_name, a.ccode, b.iperiod;",
"sqlParams": "[{\"type\":\"string\",\"name\":\"subject_name\",\"displayName\":\"科目名称关键字\",\"maxLength\":100,\"defaultValue\":\"银行存款\",\"required\":true},{\"type\":\"string\",\"name\":\"company_names\",\"displayName\":\"公司筛选\",\"maxLength\":150,\"defaultValue\":\"\",\"required\":false},{\"type\":\"number\",\"name\":\"year\",\"displayName\":\"查询年度\",\"maxLength\":4,\"defaultValue\":\"2025\",\"required\":true},{\"type\":\"number\",\"name\":\"months\",\"displayName\":\"查询月份\",\"maxLength\":2,\"defaultValue\":\"\",\"required\":false}]",
"resultType": "list",
"sourceType": "ai",
"trainingTaskId": null,
"tableMetadataIds": "",
"executionCount": 0,
"visualizationConfigs": [],
"inputJsonSchema": "{}",
"outputJsonSchema": "{\"type\":\"object\",\"properties\":{\"data\":{\"type\":\"array\"}}}",
"lastExecutionTime": null
}
]

View File

@@ -0,0 +1,296 @@
#!/usr/bin/env python3
"""
Workflow to MCP Server
从 businessQueries.json 或 API 动态生成 MCP 工具
支持两种模式:
- local: 从本地 JSON 文件加载
- api: 从远程 API 加载
"""
import json
import os
import logging
import argparse
import anyio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
try:
from .schema_converter import convert_sql_params_to_input_schema
from .utils.api_client import execute_workflow, get_workflow_by_id
from .utils.env_config import get_workflow_id
from .utils.logger_config import setup_system_logging, get_logger
except ImportError:
from schema_converter import convert_sql_params_to_input_schema
from utils.api_client import execute_workflow, get_workflow_by_id
from utils.env_config import get_workflow_id
from utils.logger_config import setup_system_logging, get_logger
# 初始化日志系统
setup_system_logging(app_name="lzwcai_workflow_to_mcp", log_level=logging.DEBUG)
logger = get_logger(__name__)
# 初始化 MCP Server
server = Server("workflow_mcp_server")
# 全局配置
_tools_config: list[dict] = []
_config: dict = {}
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="Workflow MCP Server")
parser.add_argument(
"--mode",
type=str,
choices=["local", "api"],
default="api",
help="数据加载模式: local(本地JSON) 或 api(远程API默认)"
)
parser.add_argument(
"--json-path",
type=str,
default=None,
help="本地 JSON 文件路径 (local 模式)"
)
parser.add_argument(
"--workflow-id",
type=str,
default=None,
help="工作流ID (api 模式,可选,默认从环境变量 workflowId 获取)"
)
return parser.parse_args()
class DataLoader:
"""数据加载器基类"""
def load(self) -> list[dict]:
raise NotImplementedError
class LocalLoader(DataLoader):
"""本地 JSON 文件加载器"""
def __init__(self, json_path: str = None):
if json_path is None:
json_path = os.path.join(os.path.dirname(__file__), "businessQueries.json")
self.json_path = json_path
def load(self) -> list[dict]:
try:
with open(self.json_path, "r", encoding="utf-8") as f:
data = json.load(f)
logger.info(f"从本地加载 {len(data)} 条配置: {self.json_path}")
return data
except FileNotFoundError:
logger.error(f"配置文件不存在: {self.json_path}")
return []
except json.JSONDecodeError as e:
logger.error(f"JSON 解析错误: {e}")
return []
class ApiLoader(DataLoader):
"""API 远程加载器 - 使用 get_workflow_by_id 获取工作流配置"""
def __init__(self, workflow_id: str = None):
self.workflow_id = workflow_id or get_workflow_id()
logger.debug(f"ApiLoader 初始化工作流ID: {self.workflow_id}")
def load(self) -> list[dict]:
try:
if not self.workflow_id:
logger.error("未提供工作流ID请设置环境变量 workflowId")
return []
logger.info(f"开始从 API 加载工作流配置工作流ID: {self.workflow_id}")
# 调用 get_workflow_by_id 获取工作流配置
response = get_workflow_by_id(self.workflow_id)
logger.debug(f"API 响应原始数据: {json.dumps(response, ensure_ascii=False, indent=2)}")
# 返回格式: {code, data, msg}
if response.get("code") != 200:
logger.error(f"获取工作流配置失败: code={response.get('code')}, msg={response.get('msg')}")
return []
data = response.get("data")
logger.debug(f"API 响应 data 字段: {json.dumps(data, ensure_ascii=False, indent=2) if data else 'None'}")
# data 可能是单个对象或列表
if isinstance(data, dict):
result = [data]
elif isinstance(data, list):
result = data
else:
logger.warning(f"API 响应 data 字段类型异常: {type(data)}")
result = []
logger.info(f"从 API 加载工作流配置成功工作流ID: {self.workflow_id}, 配置数量: {len(result)}")
return result
except Exception as e:
logger.error(f"API 请求失败: {e}", exc_info=True)
return []
def create_loader(mode: str, **kwargs) -> DataLoader:
"""
创建数据加载器
Args:
mode: "local""api"
kwargs:
- json_path: 本地 JSON 路径 (local 模式)
- workflow_id: 工作流ID (api 模式,可选,默认从环境变量获取)
"""
if mode == "local":
return LocalLoader(json_path=kwargs.get("json_path"))
elif mode == "api":
return ApiLoader(workflow_id=kwargs.get("workflow_id"))
else:
raise ValueError(f"不支持的模式: {mode}")
def init_tools(loader: DataLoader):
"""初始化工具配置"""
global _tools_config
_tools_config = loader.load()
logger.info(f"已加载 {len(_tools_config)} 个工具配置")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""列出所有可用工具"""
logger.info(f"收到 ListTools 请求,当前配置数量: {len(_tools_config)}")
tools = []
for query in _tools_config:
name = query.get("name", "")
description = query.get("description") or query.get("toolPrompt") or query.get("uniqueName", "")
sql_params = query.get("sqlParams", "[]")
logger.debug(f"处理工具配置: name={name}, description={description[:50] if description else 'None'}...")
# 转换参数为 inputSchema
input_schema = convert_sql_params_to_input_schema(sql_params)
logger.debug(f"工具 {name} 的 inputSchema: {json.dumps(input_schema, ensure_ascii=False)}")
tools.append(
types.Tool(
name=name,
description=description,
inputSchema=input_schema
)
)
logger.info(f"ListTools 响应: 返回 {len(tools)} 个工具")
return tools
@server.call_tool()
async def handle_call_tool(
name: str,
arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""调用工具"""
logger.info(f"收到 CallTool 请求: name={name}, arguments={json.dumps(arguments, ensure_ascii=False) if arguments else 'None'}")
# 查找对应的工具配置
tool_config = None
for query in _tools_config:
if query.get("name") == name:
tool_config = query
break
if tool_config is None:
logger.error(f"未找到工具配置: {name}")
raise ValueError(f"未知工具: {name}")
logger.debug(f"找到工具配置: {json.dumps(tool_config, ensure_ascii=False, indent=2)}")
# 获取工作流ID
workflow_id = tool_config.get("workflowId") or get_workflow_id()
logger.info(f"使用工作流ID: {workflow_id}")
# 构建请求数据
request_data = {
"workflowId": workflow_id,
"inputs": arguments or {}
}
logger.info(f"执行工作流请求数据: {json.dumps(request_data, ensure_ascii=False, indent=2)}")
try:
# 调用工作流执行API
result = execute_workflow(request_data)
logger.info(f"工作流执行成功: {workflow_id}")
logger.debug(f"工作流执行结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
except Exception as e:
logger.error(f"工作流执行失败: {e}", exc_info=True)
result = {
"error": str(e),
"tool_name": name,
"workflow_id": workflow_id
}
return [
types.TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)
]
async def run_server():
"""运行 MCP Server (stdio 模式)"""
async with stdio_server() as streams:
await server.run(
streams[0],
streams[1],
InitializationOptions(
server_name="workflow_mcp_server",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
def main():
"""主入口"""
global _config
logger.info("=" * 50)
logger.info("Workflow MCP Server 启动")
logger.info("=" * 50)
# 解析命令行参数
args = parse_arguments()
_config = vars(args)
logger.info(f"命令行参数: {_config}")
# 创建加载器并初始化工具
logger.info(f"使用模式: {args.mode}")
loader = create_loader(
mode=args.mode,
json_path=args.json_path,
workflow_id=args.workflow_id
)
init_tools(loader)
logger.info("开始运行 MCP Server (stdio 模式)")
# 运行服务器
anyio.run(run_server)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,18 @@
[project]
name = "lzwcai-workflow-to-mcp"
version = "0.1.0"
description = "从 businessQueries 动态生成 MCP 工具"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"mcp>=1.0.0",
"anyio>=4.0.0",
"requests>=2.28.0",
]
[project.scripts]
workflow-mcp = "main:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View File

@@ -0,0 +1,208 @@
"""
Schema 转换器
将 sqlParams 数组格式转换为 MCP 工具需要的 JSON Schema 格式
支持的类型:
- string: 文本输入
- paragraph: 段落/多行文本
- select: 下拉选项
- number: 数字输入
"""
import json
from typing import Any
def convert_param_to_schema_property(param: dict) -> tuple[str, dict, bool]:
"""
将单个参数转换为 JSON Schema property
Args:
param: 参数配置,格式如:
{
"type": "string",
"name": "company_names",
"displayName": "公司名称",
"maxLength": 200,
"defaultValue": "",
"required": true,
"options": ["选项1", "选项2"] # 仅 select 类型
}
Returns:
tuple: (property_name, property_schema, is_required)
"""
param_type = param.get("type", "string")
param_name = param.get("name", "")
display_name = param.get("displayName", param_name)
default_value = param.get("defaultValue", "")
max_length = param.get("maxLength")
is_required = param.get("required", False)
options = param.get("options", [])
property_schema = {
"description": display_name
}
if param_type == "string":
property_schema["type"] = "string"
if max_length:
property_schema["maxLength"] = max_length
elif param_type == "paragraph":
property_schema["type"] = "string"
property_schema["format"] = "paragraph"
if max_length:
property_schema["maxLength"] = max_length
elif param_type == "select":
property_schema["type"] = "string"
if options:
property_schema["enum"] = options
elif param_type == "number":
property_schema["type"] = "number"
else:
# 默认当作 string 处理
property_schema["type"] = "string"
# 添加默认值
if default_value not in (None, ""):
if param_type == "number":
try:
property_schema["default"] = int(default_value) if str(default_value).isdigit() else float(default_value)
except (ValueError, TypeError):
property_schema["default"] = default_value
else:
property_schema["default"] = default_value
return param_name, property_schema, is_required
def convert_sql_params_to_input_schema(sql_params: str | list) -> dict:
"""
将 sqlParams 转换为 MCP 工具的 inputSchema
Args:
sql_params: sqlParams 字段值,可以是 JSON 字符串或已解析的列表
格式: [{"type": "string", "name": "xxx", ...}, ...]
Returns:
dict: MCP 工具的 inputSchema格式如:
{
"type": "object",
"properties": {...},
"required": [...]
}
"""
# 解析 JSON 字符串
if isinstance(sql_params, str):
try:
params_list = json.loads(sql_params)
except json.JSONDecodeError:
return {"type": "object", "properties": {}, "required": []}
else:
params_list = sql_params
if not isinstance(params_list, list):
return {"type": "object", "properties": {}, "required": []}
input_schema = {
"type": "object",
"properties": {},
"required": []
}
for param in params_list:
if not isinstance(param, dict):
continue
name, schema, is_required = convert_param_to_schema_property(param)
if name:
input_schema["properties"][name] = schema
if is_required:
input_schema["required"].append(name)
return input_schema
def convert_business_query_to_tool(query: dict) -> dict:
"""
将单个 businessQuery 转换为 MCP Tool 配置
Args:
query: businessQuery 对象
Returns:
dict: MCP Tool 配置
{
"name": "工具名称",
"description": "工具描述",
"inputSchema": {...}
}
"""
name = query.get("name", "")
description = query.get("description") or query.get("toolPrompt") or query.get("uniqueName", "")
sql_params = query.get("sqlParams", "[]")
input_schema = convert_sql_params_to_input_schema(sql_params)
return {
"name": name,
"description": description,
"inputSchema": input_schema,
# 保留原始数据供后续使用
"_raw": {
"id": query.get("id"),
"uniqueName": query.get("uniqueName"),
"sqlTemplate": query.get("sqlTemplate"),
"datasourceId": query.get("datasourceId"),
"toolType": query.get("toolType"),
}
}
def convert_all_queries_to_tools(queries: list[dict]) -> list[dict]:
"""
将所有 businessQueries 转换为 MCP Tools 列表
Args:
queries: businessQueries 列表
Returns:
list: MCP Tools 配置列表
"""
tools = []
for query in queries:
tool = convert_business_query_to_tool(query)
if tool["name"]:
tools.append(tool)
return tools
# 测试用
if __name__ == "__main__":
# 测试单个参数转换
test_param = {
"type": "select",
"name": "subject_name",
"displayName": "科目名称",
"maxLength": 100,
"defaultValue": "",
"required": True,
"options": ["管理费用", "销售费用", "财务费用"]
}
name, schema, required = convert_param_to_schema_property(test_param)
print(f"参数名: {name}")
print(f"Schema: {json.dumps(schema, ensure_ascii=False, indent=2)}")
print(f"必填: {required}")
print()
# 测试完整 sqlParams 转换
test_sql_params = '[{"type":"paragraph","name":"digest","displayName":"摘要关键字","maxLength":500,"defaultValue":"","required":true},{"type":"string","name":"company_names","displayName":"公司名称","maxLength":200,"defaultValue":"","required":false},{"type":"number","name":"year","displayName":"会计年度","maxLength":4,"defaultValue":"2025","required":true}]'
input_schema = convert_sql_params_to_input_schema(test_sql_params)
print("InputSchema:")
print(json.dumps(input_schema, ensure_ascii=False, indent=2))

View File

@@ -0,0 +1,33 @@
"""Utils package for lzwcai_workflow_to_mcp"""
from .json_helper import load_json
from .name_helper import generate_tool_name
from .schema_helper import generate_input_schema, validate_input_schema
from .api_client import (
WorkflowAPIClient,
get_workflow_by_id,
execute_workflow,
process_workflow_response,
get_default_client,
DEFAULT_TIMEOUT,
WORKFLOW_EXECUTE_TIMEOUT
)
from .env_config import get_workflow_id, get_backend_base_url, get_env_config, set_env_variable
__all__ = [
'load_json',
'generate_tool_name',
'generate_input_schema',
'validate_input_schema',
'WorkflowAPIClient',
'get_workflow_by_id',
'execute_workflow',
'process_workflow_response',
'get_default_client',
'DEFAULT_TIMEOUT',
'WORKFLOW_EXECUTE_TIMEOUT',
'get_workflow_id',
'get_backend_base_url',
'get_env_config',
'set_env_variable'
]

View File

@@ -0,0 +1,307 @@
"""
工作流API调用客户端
用于调用工作流管理和执行接口
"""
import httpx
import json
from typing import Dict, Any, Optional, List
try:
from .env_config import get_backend_base_url, get_workflow_execute_key
from .logger_config import get_logger
except ImportError:
from env_config import get_backend_base_url, get_workflow_execute_key
from logger_config import get_logger
logger = get_logger(__name__)
# 默认超时配置(秒)
DEFAULT_TIMEOUT = 30.0 # 普通请求超时
WORKFLOW_EXECUTE_TIMEOUT = 300.0 # 工作流执行超时5分钟
class WorkflowAPIClient:
"""工作流API客户端"""
def __init__(
self,
base_url: Optional[str] = None,
token: Optional[str] = None,
default_timeout: float = DEFAULT_TIMEOUT,
execute_timeout: float = WORKFLOW_EXECUTE_TIMEOUT
):
"""
初始化API客户端
Args:
base_url: API基础URL默认从环境变量 backendBaseUrl 读取)
token: 认证令牌(默认从环境变量 workflowExecuteKey 读取)
default_timeout: 普通请求超时时间默认30秒
execute_timeout: 工作流执行超时时间默认300秒5分钟
"""
if base_url is None:
base_url = get_backend_base_url()
if token is None:
token = get_workflow_execute_key()
self.base_url = base_url.rstrip('/')
self.token = token
self.default_timeout = default_timeout
self.execute_timeout = execute_timeout
self._client: Optional[httpx.Client] = None
logger.debug(f"WorkflowAPIClient 初始化: base_url={self.base_url}, token={'*' * 10 if self.token else 'None'}, execute_timeout={self.execute_timeout}s")
@property
def client(self) -> httpx.Client:
"""懒加载 HTTP 客户端"""
if self._client is None:
self._client = httpx.Client(timeout=self.default_timeout)
return self._client
def _get_headers(self) -> Dict[str, str]:
"""获取请求头"""
return {
'X-API-Key': f'Bearer {self.token}',
}
def get_workflow_by_id(self, workflow_id: str) -> Dict[str, Any]:
"""
根据工作流ID获取工作流信息
Args:
workflow_id: 工作流ID
Returns:
API响应数据
Raises:
Exception: 请求失败时抛出
"""
url = f"{self.base_url}/system/workflowManage/getByWorkflowId/{workflow_id}"
try:
logger.info(f"[API请求] GET {url}")
logger.debug(f"[API请求] Headers: {self._get_headers()}")
response = self.client.get(
url,
headers=self._get_headers()
)
logger.info(f"[API响应] HTTP {response.status_code}")
logger.debug(f"[API响应] Headers: {dict(response.headers)}")
response.raise_for_status()
data = response.json()
logger.info(f"[API响应] 获取工作流配置成功: workflow_id={workflow_id}")
logger.debug(f"[API响应] Body: {json.dumps(data, ensure_ascii=False, indent=2)}")
return data
except httpx.TimeoutException:
error_msg = f"API请求超时: {url}"
logger.error(f"[API错误] {error_msg}")
raise Exception(error_msg)
except httpx.HTTPStatusError as e:
error_msg = f"API请求失败 (HTTP {e.response.status_code}): {url}"
logger.error(f"[API错误] {error_msg}")
logger.error(f"[API错误] 响应内容: {e.response.text}")
raise Exception(error_msg)
except httpx.RequestError as e:
error_msg = f"API请求异常: {url}, 错误: {str(e)}"
logger.error(f"[API错误] {error_msg}")
raise Exception(error_msg)
except Exception as e:
error_msg = f"处理API响应时出错: {str(e)}"
logger.error(f"[API错误] {error_msg}", exc_info=True)
raise Exception(error_msg)
def execute_workflow(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
执行工作流
Args:
request_data: 请求数据,包含工作流执行所需的参数
Returns:
API响应数据
Raises:
Exception: 请求失败时抛出
"""
url = f"{self.base_url}/open/workflow/execute"
try:
headers = self._get_headers()
headers['Content-Type'] = 'application/json'
headers['Accept'] = '*/*'
logger.info(f"[API请求] POST {url} (超时: {self.execute_timeout}s)")
logger.debug(f"[API请求] Headers: {headers}")
logger.debug(f"[API请求] Body: {json.dumps(request_data, ensure_ascii=False, indent=2)}")
# 使用更长的超时时间执行工作流
response = self.client.post(
url,
headers=headers,
json=request_data,
timeout=self.execute_timeout # 使用工作流执行专用超时时间
)
logger.info(f"[API响应] HTTP {response.status_code}")
logger.debug(f"[API响应] Headers: {dict(response.headers)}")
response.raise_for_status()
data = response.json()
logger.info(f"[API响应] 执行工作流成功")
logger.debug(f"[API响应] Body: {json.dumps(data, ensure_ascii=False, indent=2)}")
return data
except httpx.TimeoutException:
error_msg = f"执行工作流API请求超时: {url}"
logger.error(f"[API错误] {error_msg}")
raise Exception(error_msg)
except httpx.HTTPStatusError as e:
error_msg = f"执行工作流API请求失败 (HTTP {e.response.status_code}): {url}"
logger.error(f"[API错误] {error_msg}")
logger.error(f"[API错误] 响应内容: {e.response.text}")
raise Exception(error_msg)
except httpx.RequestError as e:
error_msg = f"执行工作流API请求异常: {url}, 错误: {str(e)}"
logger.error(f"[API错误] {error_msg}")
raise Exception(error_msg)
except Exception as e:
error_msg = f"处理执行工作流API响应时出错: {str(e)}"
logger.error(f"[API错误] {error_msg}", exc_info=True)
raise Exception(error_msg)
def close(self):
"""关闭HTTP客户端"""
if self._client is not None:
self._client.close()
self._client = None
def __enter__(self):
"""支持 context manager"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出时关闭客户端"""
self.close()
return False
# 懒加载的默认客户端
_default_client: Optional[WorkflowAPIClient] = None
def get_default_client() -> WorkflowAPIClient:
"""获取默认客户端(懒加载)"""
global _default_client
if _default_client is None:
_default_client = WorkflowAPIClient()
return _default_client
def get_workflow_by_id(workflow_id: str, base_url: Optional[str] = None, token: Optional[str] = None) -> Dict[str, Any]:
"""
便捷函数根据工作流ID获取工作流信息
Args:
workflow_id: 工作流ID
base_url: API基础URL可选
token: 认证令牌(可选)
Returns:
API响应数据
"""
if base_url or token:
with WorkflowAPIClient(base_url=base_url, token=token) as client:
return client.get_workflow_by_id(workflow_id)
else:
return get_default_client().get_workflow_by_id(workflow_id)
def execute_workflow(
request_data: Dict[str, Any],
base_url: Optional[str] = None,
token: Optional[str] = None,
timeout: float = WORKFLOW_EXECUTE_TIMEOUT
) -> Dict[str, Any]:
"""
便捷函数:执行工作流
Args:
request_data: 请求数据
base_url: API基础URL可选
token: 认证令牌(可选)
timeout: 超时时间默认300秒5分钟
Returns:
API响应数据
"""
if base_url or token:
with WorkflowAPIClient(base_url=base_url, token=token, execute_timeout=timeout) as client:
return client.execute_workflow(request_data)
else:
# 更新默认客户端的超时时间
default_client = get_default_client()
default_client.execute_timeout = timeout
return default_client.execute_workflow(request_data)
def process_workflow_response(response: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
处理API响应数据映射为工作流配置格式
Args:
response: API原始响应数据
Returns:
处理后的工作流配置列表
"""
try:
data_list = response.get("data", [])
# 如果 data 是单个对象而不是列表,转换为列表
if isinstance(data_list, dict):
data_list = [data_list]
workflows = []
for workflow in data_list:
# 解析 inputParams 字符串为 JSON 对象
input_params = workflow.get("inputParams", {})
if isinstance(input_params, str):
try:
input_params = json.loads(input_params)
except json.JSONDecodeError:
input_params = {}
# 映射字段
config = {
"id": workflow.get("id"),
"workflowId": workflow.get("workflowId"),
"workflowName": workflow.get("workflowName") or workflow.get("name"),
"workflowDescription": workflow.get("workflowDescription") or workflow.get("description"),
"inputParams": input_params
}
workflows.append(config)
logger.info(f"成功处理 {len(workflows)} 条工作流数据")
return workflows
except Exception as e:
logger.error(f"处理API响应数据失败: {e}", exc_info=True)
raise

View File

@@ -0,0 +1,85 @@
"""环境变量配置模块"""
import os
from typing import Optional
def get_workflow_id(default: str = "") -> str:
"""
获取工作流ID环境变量
Args:
default: 默认值(默认为 ""
Returns:
str: 工作流ID
Environment Variables:
workflowId: 工作流ID
"""
return os.environ.get("workflowId", default)
def get_backend_base_url(default: str = "http://lzwcai-demp-corp-manager:8086") -> str:
"""
获取后端API基础URL环境变量
Args:
default: 默认值(默认为 "http://lzwcai-demp-corp-manager:8086"
Returns:
str: 后端API基础URL
Environment Variables:
backendBaseUrl: 后端API基础URL
"""
return os.environ.get("backendBaseUrl", default)
def get_workflow_execute_key(default: str = "") -> str:
"""
获取工作流执行密钥Token环境变量
Args:
default: 默认值(默认为 ""
Returns:
str: 工作流执行密钥
Environment Variables:
workflowExecuteKey: 工作流执行密钥/Token
"""
return os.environ.get("workflowExecuteKey", default)
def get_env_config() -> dict:
"""
获取所有环境配置
Returns:
dict: 包含所有配置的字典
Example:
config = get_env_config()
print(config['workflow_id']) # 输出: ""
print(config['backend_base_url']) # 输出: "http://lzwcai-demp-corp-manager:8086"
"""
return {
"workflow_id": get_workflow_id(),
"backend_base_url": get_backend_base_url(),
"workflow_execute_key": get_workflow_execute_key()
}
def set_env_variable(key: str, value: str) -> None:
"""
设置环境变量(仅在当前进程中有效)
Args:
key: 环境变量名
value: 环境变量值
Example:
set_env_variable("workflowId", "1234567890")
"""
os.environ[key] = value

View File

@@ -0,0 +1,59 @@
"""JSON 文件读取工具"""
import json
from pathlib import Path
from typing import Any, Union
def load_json(json_path: Union[str, Path]) -> Any:
"""
读取 JSON 文件并返回其内容
Args:
json_path: JSON 文件的路径(支持字符串或 Path 对象)
Returns:
JSON 文件中解析后的数据(可以是字典、列表或其他 JSON 类型)
Raises:
FileNotFoundError: 当文件不存在时
json.JSONDecodeError: 当 JSON 格式无效时
Exception: 其他读取错误
Example:
>>> data = load_json('config.json')
>>> print(data)
{'key': 'value'}
>>> data = load_json(Path('data/users.json'))
>>> print(data)
[{'id': 1, 'name': 'Alice'}]
"""
try:
# 转换为 Path 对象
path = Path(json_path)
# 检查文件是否存在
if not path.exists():
raise FileNotFoundError(f"JSON 文件不存在: {json_path}")
# 检查是否为文件
if not path.is_file():
raise ValueError(f"路径不是一个文件: {json_path}")
# 读取并解析 JSON 文件
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except json.JSONDecodeError as e:
raise json.JSONDecodeError(
f"JSON 格式错误: {e.msg}",
e.doc,
e.pos
)
except FileNotFoundError:
raise
except Exception as e:
raise Exception(f"读取 JSON 文件时发生错误: {str(e)}")

View File

@@ -0,0 +1,174 @@
# -*- coding: utf-8 -*-
"""
统一日志配置模块
提供系统级别的日志配置和管理
"""
import os
import sys
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from datetime import datetime
from pathlib import Path
class LoggerConfig:
"""日志配置管理类"""
def __init__(self, logs_dir: str = None):
"""初始化日志配置
Args:
logs_dir: 日志目录路径默认为项目根目录下的logs文件夹
"""
if logs_dir:
self.logs_dir = Path(logs_dir)
else:
project_root = Path(__file__).parent.parent
self.logs_dir = project_root / "logs"
self.logs_dir.mkdir(exist_ok=True)
self.log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
self.date_format = '%Y-%m-%d %H:%M:%S'
self.log_level = self._get_log_level_from_env()
self._initialized = False
def _get_log_level_from_env(self) -> int:
log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
level_mapping = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'WARN': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
'FATAL': logging.CRITICAL
}
return level_mapping.get(log_level_str, logging.INFO)
def setup_logging(self,
app_name: str = "lzwcai_workflow_to_mcp",
log_level: int = logging.INFO,
max_file_size: int = 10 * 1024 * 1024,
backup_count: int = 5,
console_output: bool = True) -> logging.Logger:
if self._initialized:
return logging.getLogger()
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
formatter = logging.Formatter(self.log_format, self.date_format)
# 1. 主日志文件 - 按大小滚动
main_log_file = self.logs_dir / f"{app_name}.log"
file_handler = RotatingFileHandler(
main_log_file,
maxBytes=max_file_size,
backupCount=backup_count,
encoding='utf-8'
)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# 2. 错误日志文件
error_log_file = self.logs_dir / f"{app_name}_error.log"
error_handler = RotatingFileHandler(
error_log_file,
maxBytes=max_file_size,
backupCount=backup_count,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
root_logger.addHandler(error_handler)
# 3. 按日期滚动的日志文件
daily_log_file = self.logs_dir / f"{app_name}_daily.log"
daily_handler = TimedRotatingFileHandler(
daily_log_file,
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
daily_handler.setLevel(log_level)
daily_handler.setFormatter(formatter)
daily_handler.suffix = "%Y-%m-%d"
root_logger.addHandler(daily_handler)
# 4. 控制台输出 (MCP协议使用stdio时必须将日志输出到stderr)
if console_output:
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(log_level)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
self.date_format
)
console_handler.setFormatter(console_formatter)
root_logger.addHandler(console_handler)
self._initialized = True
root_logger.info(f"日志系统初始化完成 - 日志目录: {self.logs_dir}")
return root_logger
def get_module_logger(self, module_name: str) -> logging.Logger:
return logging.getLogger(module_name)
def create_component_logger(self,
component_name: str,
log_file: str = None,
log_level: int = None) -> logging.Logger:
logger = logging.getLogger(component_name)
if log_file:
component_log_file = self.logs_dir / log_file
handler = RotatingFileHandler(
component_log_file,
maxBytes=5 * 1024 * 1024,
backupCount=3,
encoding='utf-8'
)
formatter = logging.Formatter(self.log_format, self.date_format)
handler.setFormatter(formatter)
if log_level:
handler.setLevel(log_level)
logger.addHandler(handler)
return logger
def setup_mcp_logging(self) -> logging.Logger:
return self.create_component_logger(
"mcp_services",
"mcp_services.log",
logging.DEBUG
)
def setup_api_logging(self) -> logging.Logger:
return self.create_component_logger(
"api_requests",
"api_requests.log",
logging.INFO
)
# 全局日志配置实例
logger_config = LoggerConfig()
def setup_system_logging(app_name: str = "lzwcai_workflow_to_mcp",
log_level: int = logging.INFO) -> logging.Logger:
return logger_config.setup_logging(app_name, log_level)
def get_logger(name: str) -> logging.Logger:
return logger_config.get_module_logger(name)

View File

@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
"""
名称生成工具模块
"""
from pypinyin import lazy_pinyin, Style
import logging
logger = logging.getLogger(__name__)
def generate_tool_name(business_name: str, tool_id: str) -> str:
"""
根据业务名称和ID生成工具名称
格式: tool_拼音_id
Args:
business_name: 业务名称(中文)
tool_id: 工具ID
Returns:
str: 格式化的工具名称
"""
try:
# 将中文转换为拼音(无音调,小写)
pinyin_list = lazy_pinyin(business_name, style=Style.NORMAL)
# 拼接拼音
pinyin_str = ''.join(pinyin_list)
# 将 ID 中的 '-' 替换为 '_'
formatted_id = tool_id.replace('-', '_')
# 组合成最终的工具名称
tool_name = f"workflow_{pinyin_str}_{formatted_id}"
return tool_name
except Exception as e:
logger.error(f"生成工具名称失败: {business_name}, {tool_id}, 错误: {e}", exc_info=True)
# 降级处理:如果拼音转换失败,使用 ID
return f"workflow_{tool_id.replace('-', '_')}"

View File

@@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
"""
Schema 生成工具模块
"""
from typing import Any, Dict, List
def generate_input_schema(parameters: Dict[str, Any]) -> Dict[str, Any]:
"""
从查询配置的参数定义生成 MCP 工具的 inputSchema
此函数会保留完整的 JSON Schema 信息,包括:
- type: Schema 类型(通常是 "object"
- required: 必填字段列表
- properties: 属性定义(包括每个属性的 type, description, format, examples 等)
- description: Schema 的整体描述(如果有)
- 以及其他任何 JSON Schema 标准字段
Args:
parameters: 查询配置中的参数定义字典,应该是一个完整的 JSON Schema 对象
Returns:
Dict[str, Any]: 符合 JSON Schema 规范的 inputSchema 对象
"""
# 如果 parameters 本身就是一个完整的 JSON Schema 对象,直接使用
# 但确保至少包含 type 和 properties
if not parameters:
# 如果 parameters 为空,返回一个空的 object schema
return {
"type": "object",
"properties": {},
"required": []
}
# 深拷贝 parameters 以避免修改原始数据
input_schema = dict(parameters)
# 确保必需的字段存在
if "type" not in input_schema:
input_schema["type"] = "object"
if "properties" not in input_schema:
input_schema["properties"] = {}
if "required" not in input_schema:
input_schema["required"] = []
return input_schema
def validate_input_schema(schema: Dict[str, Any]) -> tuple[bool, str]:
"""
验证 inputSchema 是否符合基本的 JSON Schema 规范
Args:
schema: 要验证的 schema 对象
Returns:
tuple[bool, str]: (是否有效, 错误消息或成功消息)
"""
if not isinstance(schema, dict):
return False, "Schema 必须是一个字典对象"
if schema.get("type") != "object":
return False, "Schema 的 type 字段必须是 'object'"
if "properties" not in schema:
return False, "Schema 必须包含 properties 字段"
if not isinstance(schema.get("properties"), dict):
return False, "Schema 的 properties 字段必须是一个字典对象"
# 验证 required 字段(如果存在)
if "required" in schema:
required = schema["required"]
if not isinstance(required, list):
return False, "Schema 的 required 字段必须是一个列表"
# 验证所有 required 的字段都在 properties 中定义
properties = schema["properties"]
for field in required:
if field not in properties:
return False, f"必填字段 '{field}' 未在 properties 中定义"
# 验证 properties 中每个字段的定义
for prop_name, prop_def in schema["properties"].items():
if not isinstance(prop_def, dict):
return False, f"属性 '{prop_name}' 的定义必须是一个字典对象"
if "type" not in prop_def:
return False, f"属性 '{prop_name}' 必须包含 type 字段"
return True, "Schema 验证通过"

View File

@@ -0,0 +1,16 @@
"""
Entry point for lzwcai-workflow-to-mcp
Runs the MCP server for workflow execution
"""
import os
if __name__ == "__main__":
# 设置环境变量
os.environ["workflowId"] = "2002300699510763521"
os.environ["workflowExecuteKey"] = "wf_buh230o9iaea4n6aefsddcexa7p27ydl"
os.environ["backendBaseUrl"] = "http://192.168.2.236:8088"
# Import and run the actual MCP server
from lzwcai_workflow_to_mcp.main import main
main()

View File

@@ -0,0 +1,35 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "lzwcai-workflow-to-mcp"
version = "0.1.8"
description = "MCP server for executing business SQL queries with dynamic tool generation"
readme = "README.md"
requires-python = ">=3.13"
license = {text = "MIT"}
authors = [
{name = "lzwcai", email = "your-email@example.com"},
]
keywords = ["mcp", "sql", "executor", "server"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.13",
]
dependencies = [
"httpx>=0.28.1",
"mcp[cli]>=1.10.1",
"pypinyin>=0.53.0",
]
[project.scripts]
lzwcai-workflow-to-mcp = "lzwcai_workflow_to_mcp.main:main"
[tool.hatch.build.targets.wheel]
packages = ["lzwcai_workflow_to_mcp"]
[tool.hatch.build.targets.wheel.force-include]
"lzwcai_workflow_to_mcp/businessQueries.json" = "lzwcai_workflow_to_mcp/businessQueries.json"