feat(lzwcai-agile-db): 更新AgileDB技能至v0.4.2版本并扩展工具集

- 将技能版本从0.2.0升级至0.4.2
- 工具数量从33个扩展至57个,新增数据源管理、AI训练、库表关联配置等功能
- 新增MQTT字段关联同步模块(8个工具)和库表关联配置(3个工具)
- 添加重要的契约提示和安全确认原则,包括target默认值、alter_table操作限制等
- 修正工具参数说明,如execute_sql的executableSql改为sql,参数结构优化
- 增强安全机制,明确危险操作的用户确认流程和目标资源选择规则
- 更新README.md中的工具数量统计和功能描述
This commit is contained in:
2026-06-17 14:40:43 +08:00
parent 557361632c
commit ba5cd4bbe1
115 changed files with 7587 additions and 575 deletions

View File

@@ -0,0 +1 @@
3.12

View File

@@ -0,0 +1,75 @@
# lzwcai-mcp-agile-db-third
AgileDB 数据源管理 MCP Server基于 `API_DOCUMENTATION.md` 将后端数据源/连接/DDL/DML/API 接口封装为 34 个 MCP 工具。
## 功能概述
本服务将后端 `datasource` 模块的 API 接口代理为标准 MCP 工具,分为两大类:
### 1. 数据源配置管理
- `list_datasource_configs`:查询数据源配置列表
- `get_datasource_config`:获取数据源配置详情
- `batch_create_datasource_configs`:批量创建数据源配置
- `replace_datasource_configs`:全量替换数据源配置
- `batch_update_datasource_configs`:批量修改数据源配置
- `delete_datasource_configs`:批量删除数据源配置
- `test_connection_config`:测试数据库连接
- `change_datasource_status`:修改数据源状态
- `export_datasource_configs`:导出数据源配置为 Excel
### 2. 数据库连接实例管理
- `list_connections` / `get_connection` / `create_connection` / `update_connection` / `delete_connection`:连接实例 CRUD
- `test_connection` / `change_connection_status`:连接测试与状态切换
- `realtime_structure` / `realtime_databases` / `realtime_tables`:实时查询库表结构
- `create_builtin_postgresql` / `update_builtin_database`:内置 PostgreSQL 连接管理
- `execute_sql`:执行原生 SQL
- `create_database` / `create_table` / `create_database_table` / `alter_database` / `alter_table`DDL 操作
- `generate_table`AI 生成表结构
- `import_document_preview` / `import_document_confirm`Excel/CSV 文档导入
- `builtin_table_data` / `builtin_table_insert` / `builtin_table_update` / `builtin_table_delete`:表数据 CRUD
## 环境配置
| 环境变量 | 说明 | 默认值 |
|----------|------|--------|
| `backendBaseUrl` | 后端 API 基础地址 | `http://lzwcai-demp-corp-manager:8086` |
| `datasourceApiKey` | 默认 `X-Datasource-API-Key`,可选 | 空 |
| `LOG_LEVEL` | 日志级别 | `INFO` |
## 安装与运行
使用 uv
```bash
uv sync
uv run python -m lzwcai_mcp_agile_db_third.main
```
或使用 pip
```bash
python -m venv .venv
.venv\Scripts\python -m pip install mcp httpx
.venv\Scripts\python -m lzwcai_mcp_agile_db_third.main
```
安装为命令后:
```bash
lzwcai-mcp-agile-db-third
```
## 使用 mcp CLI
```bash
mcp dev lzwcai_mcp_agile_db_third/main.py
```
## 注意事项
- 所有写操作(创建/修改/删除)会先落到 `prod` 环境;带 `target` 参数的工具可切换为 `test`
- `import_document_preview` 通过本地文件路径上传 Excel/CSV
- 执行删除类工具前,调用方应遵循安全确认原则,向用户展示影响范围并二次确认
- 日志文件中会对 `password``apiKey``token``secret` 等敏感字段进行脱敏

View File

@@ -0,0 +1,21 @@
# Python
__pycache__/
*.py[cod]
*.egg-info/
*.egg
# Virtual environments
.venv/
venv/
# Logs
logs/
*.log
# IDE
.vscode/
.idea/
# OS
.DS_Store
Thumbs.db

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,75 @@
# lzwcai-mcp-agile-db-third
AgileDB 数据源管理 MCP Server基于 `API_DOCUMENTATION.md` 将后端数据源/连接/DDL/DML/API 接口封装为 34 个 MCP 工具。
## 功能概述
本服务将后端 `datasource` 模块的 API 接口代理为标准 MCP 工具,分为两大类:
### 1. 数据源配置管理
- `list_datasource_configs`:查询数据源配置列表
- `get_datasource_config`:获取数据源配置详情
- `batch_create_datasource_configs`:批量创建数据源配置
- `replace_datasource_configs`:全量替换数据源配置
- `batch_update_datasource_configs`:批量修改数据源配置
- `delete_datasource_configs`:批量删除数据源配置
- `test_connection_config`:测试数据库连接
- `change_datasource_status`:修改数据源状态
- `export_datasource_configs`:导出数据源配置为 Excel
### 2. 数据库连接实例管理
- `list_connections` / `get_connection` / `create_connection` / `update_connection` / `delete_connection`:连接实例 CRUD
- `test_connection` / `change_connection_status`:连接测试与状态切换
- `realtime_structure` / `realtime_databases` / `realtime_tables`:实时查询库表结构
- `create_builtin_postgresql` / `update_builtin_database`:内置 PostgreSQL 连接管理
- `execute_sql`:执行原生 SQL
- `create_database` / `create_table` / `create_database_table` / `alter_database` / `alter_table`DDL 操作
- `generate_table`AI 生成表结构
- `import_document_preview` / `import_document_confirm`Excel/CSV 文档导入
- `builtin_table_data` / `builtin_table_insert` / `builtin_table_update` / `builtin_table_delete`:表数据 CRUD
## 环境配置
| 环境变量 | 说明 | 默认值 |
|----------|------|--------|
| `backendBaseUrl` | 后端 API 基础地址 | `http://lzwcai-demp-corp-manager:8086` |
| `datasourceApiKey` | 默认 `X-Datasource-API-Key`,可选 | 空 |
| `LOG_LEVEL` | 日志级别 | `INFO` |
## 安装与运行
使用 uv
```bash
uv sync
uv run python -m lzwcai_mcp_agile_db_third.main
```
或使用 pip
```bash
python -m venv .venv
.venv\Scripts\python -m pip install mcp httpx
.venv\Scripts\python -m lzwcai_mcp_agile_db_third.main
```
安装为命令后:
```bash
lzwcai-mcp-agile-db-third
```
## 使用 mcp CLI
```bash
mcp dev lzwcai_mcp_agile_db_third/main.py
```
## 注意事项
- 所有写操作(创建/修改/删除)会先落到 `prod` 环境;带 `target` 参数的工具可切换为 `test`
- `import_document_preview` 通过本地文件路径上传 Excel/CSV
- 执行删除类工具前,调用方应遵循安全确认原则,向用户展示影响范围并二次确认
- 日志文件中会对 `password``apiKey``token``secret` 等敏感字段进行脱敏

View File

@@ -0,0 +1 @@
"""lzwcai_mcp_agile_db_third package."""

View File

@@ -0,0 +1,216 @@
"""MCP server entrypoint for Agile DB third-party datasource APIs."""
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional
try:
from .tools import list_tools, get_tool
from .utils import DataSourceAPIClient, get_env_config, get_api_key
from .utils.logger_config import logger_config
except ImportError:
from tools import list_tools, get_tool
from utils import DataSourceAPIClient, get_env_config, get_api_key
from utils.logger_config import logger_config
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
import mcp.types as types
mcp_logger = logger_config.setup_mcp_logging()
def _text_response(payload: Dict[str, Any]) -> List[types.TextContent]:
"""Build a JSON text response."""
return [
types.TextContent(
type="text",
text=json.dumps(payload, ensure_ascii=False, indent=2),
)
]
def _build_tool(tool_def: Dict[str, Any]) -> types.Tool:
"""Build an MCP Tool from a tool definition."""
return types.Tool(
name=tool_def["name"],
description=tool_def["description"],
inputSchema=tool_def["inputSchema"],
)
server = Server("lzwcai-mcp-agile-db-third")
_api_client: Optional[DataSourceAPIClient] = None
def _get_api_client() -> DataSourceAPIClient:
"""Get or create the default API client."""
global _api_client
if _api_client is None:
env = get_env_config()
_api_client = DataSourceAPIClient(base_url=env.get("backend_base_url"))
return _api_client
@server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
"""List all available MCP tools."""
try:
mcp_logger.info("收到列出工具请求")
tools = [_build_tool(tool_def) for tool_def in list_tools()]
mcp_logger.info(f"成功生成 {len(tools)} 个 MCP 工具")
return tools
except Exception as e:
mcp_logger.error(f"列出工具失败: {e}", exc_info=True)
raise
@server.call_tool()
async def handle_call_tool(
name: str,
arguments: Optional[Dict[str, Any]],
) -> List[types.TextContent]:
"""Handle MCP tool invocation by proxying to the backend API."""
try:
mcp_logger.info(f"收到工具调用请求: {name}")
mcp_logger.debug(f"工具参数: {arguments}")
tool_def = get_tool(name)
if tool_def is None:
error_msg = f"未找到工具: {name}"
mcp_logger.warning(error_msg)
return _text_response({"error": error_msg})
# Validate required parameters
args = arguments or {}
schema = tool_def["inputSchema"]
required = schema.get("required", [])
missing = [key for key in required if key not in args or args[key] is None]
if missing:
error_msg = f"工具 {name} 缺少必填参数: {', '.join(missing)}"
mcp_logger.warning(error_msg)
return _text_response({"error": error_msg, "missing": missing})
# Categorize parameters
path_params: Dict[str, Any] = {}
query_params: Dict[str, Any] = {}
body_params: Dict[str, Any] = {}
file_path: Optional[str] = None
api_key: Optional[str] = None
categories = tool_def["paramCategories"]
for key, value in args.items():
if value is None:
continue
category = categories.get(key)
if category == "path":
path_params[key] = value
elif category == "query":
query_params[key] = value
elif category == "body":
body_params[key] = value
elif category == "file":
file_path = value
elif category == "header":
api_key = value
# Apply schema-defined defaults for parameters the caller omitted,
# so "有默认值的参数不填就用默认值"(如 datasourceType、target、分页
properties = schema.get("properties", {})
for key, prop in properties.items():
if "default" not in prop:
continue
if args.get(key) is not None:
continue
default_value = prop["default"]
category = categories.get(key)
if category == "path":
path_params[key] = default_value
elif category == "query":
query_params[key] = default_value
elif category == "body":
body_params[key] = default_value
elif category == "header" and not api_key:
api_key = default_value
# Fall back to the environment-configured API key when the tool call
# doesn't carry an explicit header value.
if not api_key:
api_key = get_api_key() or None
if api_key:
mcp_logger.info("工具调用未带 API Key已回退到环境变量 datasourceApiKey")
client = _get_api_client()
result = client.request(
method=tool_def["method"],
path_template=tool_def["path"],
path_params=path_params,
query_params=query_params,
body=body_params if body_params else None,
file_path=file_path,
api_key=api_key,
)
return _text_response(result)
except Exception as e:
error_msg = f"工具调用失败: {name}, 错误: {e}"
mcp_logger.error(error_msg, exc_info=True)
return _text_response({"error": error_msg})
async def async_main():
"""Async entry for the MCP server."""
try:
mcp_logger.info("=" * 60)
mcp_logger.info("正在启动 MCP 服务: lzwcai-mcp-agile-db-third")
mcp_logger.info("版本: 0.1.0")
mcp_logger.info("=" * 60)
env = get_env_config()
mcp_logger.info(f"环境配置 - Backend Base URL: {env.get('backend_base_url')}")
mcp_logger.info(f"环境配置 - API Key: {'已设置' if env.get('api_key') else '未设置'}")
mcp_logger.info("=" * 60)
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
mcp_logger.info("MCP 服务已启动,等待客户端连接...")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="lzwcai-mcp-agile-db-third",
server_version="0.1.2",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
mcp_logger.info("MCP 服务已关闭")
except Exception as e:
mcp_logger.error(f"MCP 服务运行失败: {e}", exc_info=True)
raise
def main():
"""Console entrypoint."""
try:
logger_config.setup_logging(
app_name="lzwcai_mcp_agile_db_third",
log_level=logging.INFO,
console_output=False,
)
mcp_logger.info("开始运行 MCP Agile DB Third 服务")
asyncio.run(async_main())
except KeyboardInterrupt:
mcp_logger.info("收到中断信号,正在关闭服务...")
except Exception as e:
mcp_logger.error(f"程序运行失败: {e}", exc_info=True)
raise
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,137 @@
"""Docker-based PostgreSQL mock for local self-testing.
Provides a throwaway Postgres container that the backend can connect to during
external-connection tests. The container is bound to 0.0.0.0:5432 on the Docker
host, so the host IP must be reachable from the backend server.
"""
import socket
import subprocess
import sys
import time
from typing import Optional
DEFAULT_IMAGE = "postgres:16-alpine"
DEFAULT_CONTAINER = "lzwcai_mcp_agile_db_third_mock_pg"
DEFAULT_PORT = 5432
DEFAULT_DATABASE = "postgres"
DEFAULT_USERNAME = "postgres"
DEFAULT_PASSWORD = "postgres"
def _run(args: list, check: bool = False, capture: bool = True) -> subprocess.CompletedProcess:
"""Run a shell command and return the result."""
return subprocess.run(
args,
check=check,
capture_output=capture,
text=True,
shell=False,
)
def host_ip(prefer_prefixes: Optional[list] = None) -> str:
"""Return a non-loopback IPv4 address of this machine.
Prefers addresses starting with the given prefixes so users can steer the
result toward the network segment that the backend can reach.
"""
prefer_prefixes = prefer_prefixes or ["192.168.", "10.", "172."]
hostname = socket.gethostname()
_, _, ips = socket.gethostbyname_ex(hostname)
# Prefer addresses matching a requested prefix, then any non-loopback.
for prefix in prefer_prefixes:
for ip in ips:
if ip.startswith(prefix) and not ip.startswith("127."):
return ip
for ip in ips:
if not ip.startswith("127."):
return ip
raise RuntimeError("Could not find a non-loopback IPv4 address on this host")
def is_running(container: str = DEFAULT_CONTAINER) -> bool:
"""Check whether the mock container is currently running."""
result = _run(["docker", "ps", "--filter", f"name={container}", "--format", "{{.Names}}"])
return container in result.stdout
def start(
container: str = DEFAULT_CONTAINER,
port: int = DEFAULT_PORT,
username: str = DEFAULT_USERNAME,
password: str = DEFAULT_PASSWORD,
database: str = DEFAULT_DATABASE,
image: str = DEFAULT_IMAGE,
) -> str:
"""Start the Postgres mock container if not already running.
Returns the host IP that should be supplied to backend connection tests.
"""
if is_running(container):
print(f"Mock DB container '{container}' is already running.")
return host_ip()
# Remove any stale container with the same name.
_run(["docker", "rm", "-f", container], check=False)
print(f"Starting mock Postgres container '{container}' (image={image})...")
_run(
[
"docker", "run", "-d",
"--name", container,
"-p", f"{port}:{port}",
"-e", f"POSTGRES_USER={username}",
"-e", f"POSTGRES_PASSWORD={password}",
"-e", f"POSTGRES_DB={database}",
image,
],
check=True,
)
print("Waiting for Postgres to become ready...")
deadline = time.time() + 30
while time.time() < deadline:
result = _run(
["docker", "exec", container, "pg_isready", "-U", username],
check=False,
)
if result.returncode == 0:
break
time.sleep(0.5)
else:
stop(container)
raise RuntimeError("Postgres mock failed to become ready within 30s")
ip = host_ip()
print(f"Mock Postgres is ready at {ip}:{port} (user={username}, password={password}, db={database})")
return ip
def stop(container: str = DEFAULT_CONTAINER) -> None:
"""Stop and remove the mock Postgres container."""
print(f"Stopping mock DB container '{container}'...")
_run(["docker", "stop", "-t", "5", container], check=False)
_run(["docker", "rm", "-f", container], check=False)
def main():
if len(sys.argv) < 2:
print("Usage: python -m lzwcai_mcp_agile_db_third.mock_db [start|stop|ip]")
sys.exit(1)
cmd = sys.argv[1].lower()
if cmd == "start":
start()
elif cmd == "stop":
stop()
elif cmd in ("ip", "host"):
print(host_ip())
elif cmd == "status":
print("running" if is_running() else "stopped")
else:
print(f"Unknown command: {cmd}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,41 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "lzwcai-mcp-agile-db-third"
version = "0.1.3"
description = "MCP server for Agile DB third-party datasource APIs"
readme = "README.md"
requires-python = ">=3.12"
license = {text = "MIT"}
authors = [
{name = "lzwcai", email = "your-email@example.com"},
]
keywords = ["mcp", "datasource", "database", "agile-db"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"httpx>=0.28.1",
"mcp[cli]>=1.10.1",
]
[project.scripts]
lzwcai-mcp-agile-db-third = "lzwcai_mcp_agile_db_third.main:main"
[tool.hatch.build.targets.wheel]
packages = ["lzwcai_mcp_agile_db_third"]
[tool.hatch.build.targets.wheel.force-include]
[tool.hatch.build]
exclude = [
"lzwcai_mcp_agile_db_third/logs/**",
"**/.__pycache__/**",
"lzwcai_mcp_agile_db_third/.gitignore",
".venv/**",
]

View File

@@ -0,0 +1,870 @@
"""MCP tool definitions for datasource API."""
from typing import Any, Dict, List
def _tool(
name: str,
description: str,
method: str,
path: str,
properties: Dict[str, Any],
required: List[str],
param_categories: Dict[str, str],
) -> Dict[str, Any]:
"""Build a tool definition dictionary."""
return {
"name": name,
"description": description,
"method": method,
"path": path,
"inputSchema": {
"type": "object",
"properties": properties,
"required": required,
},
"paramCategories": param_categories,
}
# 公共参数 schema 片段
_API_KEY_PROP = {
"apiKey": {
"type": "string",
"description": "可选的 X-Datasource-API-Key 权限密钥",
}
}
_TARGET_PROP = {
"target": {
"type": "string",
"description": "目标环境prod/test默认 prod",
"default": "prod",
}
}
_PAGE_PROPS = {
"pageNum": {"type": "integer", "description": "页码默认1", "default": 1},
"pageSize": {"type": "integer", "description": "每页数量默认10", "default": 10},
}
# =============================================================================
# 1. 数据源配置管理
# =============================================================================
LIST_DATASOURCE_CONFIGS = _tool(
name="list_datasource_configs",
description="查询数据源配置列表,支持分页和多条件筛选",
method="GET",
path="/datasource/config/list",
properties={
"datasourceName": {"type": "string", "description": "数据源名称(模糊查询)"},
"datasourceId": {"type": "integer", "description": "连接ID"},
"datasourceType": {"type": "string", "description": "数据库类型"},
"status": {"type": "integer", "description": "状态"},
"showTable": {"type": "boolean", "description": "是否显示表数量"},
**_PAGE_PROPS,
},
required=[],
param_categories={
"datasourceName": "query",
"datasourceId": "query",
"datasourceType": "query",
"status": "query",
"showTable": "query",
"pageNum": "query",
"pageSize": "query",
},
)
GET_DATASOURCE_CONFIG = _tool(
name="get_datasource_config",
description="获取指定数据源配置的详细信息",
method="GET",
path="/datasource/config/{id}",
properties={
"id": {"type": "integer", "description": "数据源配置ID"},
},
required=["id"],
param_categories={"id": "path"},
)
BATCH_CREATE_DATASOURCE_CONFIGS = _tool(
name="batch_create_datasource_configs",
description="批量创建数据源配置并同步表结构",
method="POST",
path="/datasource/config",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"datasourceNamePrefix": {"type": "string", "description": "数据源名称前缀"},
"enterpriseId": {"type": "integer", "description": "企业ID"},
"status": {"type": "integer", "description": "状态"},
"remark": {"type": "string", "description": "备注信息"},
"syncTables": {"type": "boolean", "description": "是否同步表结构"},
"databases": {
"type": "array",
"description": "数据库与表列表",
"items": {
"type": "object",
"properties": {
"databaseName": {"type": "string"},
"tableNames": {"type": "array", "items": {"type": "string"}},
},
},
},
},
required=["connectionId", "databases"],
param_categories={
"connectionId": "body",
"datasourceNamePrefix": "body",
"enterpriseId": "body",
"status": "body",
"remark": "body",
"syncTables": "body",
"databases": "body",
},
)
REPLACE_DATASOURCE_CONFIGS = _tool(
name="replace_datasource_configs",
description="全量替换指定连接下的数据源配置(未传入的配置将被删除),请谨慎操作",
method="PUT",
path="/datasource/config",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"datasourceNamePrefix": {"type": "string", "description": "数据源名称前缀"},
"enterpriseId": {"type": "integer", "description": "企业ID"},
"status": {"type": "integer", "description": "状态"},
"remark": {"type": "string", "description": "备注信息"},
"syncTables": {"type": "boolean", "description": "是否同步表结构"},
"databases": {
"type": "array",
"description": "数据库与表列表",
"items": {
"type": "object",
"properties": {
"databaseName": {"type": "string"},
"tableNames": {"type": "array", "items": {"type": "string"}},
},
},
},
},
required=["connectionId", "databases"],
param_categories={
"connectionId": "body",
"datasourceNamePrefix": "body",
"enterpriseId": "body",
"status": "body",
"remark": "body",
"syncTables": "body",
"databases": "body",
},
)
BATCH_UPDATE_DATASOURCE_CONFIGS = _tool(
name="batch_update_datasource_configs",
description="批量修改数据源配置并重新同步表结构",
method="PUT",
path="/datasource/config/batch",
properties={
"status": {"type": "integer", "description": "状态"},
"remark": {"type": "string", "description": "批量更新备注"},
"syncTables": {"type": "boolean", "description": "是否同步表结构"},
"datasources": {
"type": "array",
"description": "要更新的数据源配置列表",
"items": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"datasourceName": {"type": "string"},
"tableNames": {"type": "array", "items": {"type": "string"}},
},
},
},
},
required=["datasources"],
param_categories={
"status": "body",
"remark": "body",
"syncTables": "body",
"datasources": "body",
},
)
DELETE_DATASOURCE_CONFIGS = _tool(
name="delete_datasource_configs",
description="批量删除数据源配置",
method="POST",
path="/datasource/config/deletes",
properties={
"ids": {
"type": "array",
"description": "要删除的数据源配置ID列表",
"items": {"type": "integer"},
},
},
required=["ids"],
param_categories={"ids": "body"},
)
TEST_CONNECTION_CONFIG = _tool(
name="test_connection_config",
description="测试数据库连接是否正常",
method="POST",
path="/datasource/config/testConnectionConfig",
properties={
"host": {"type": "string", "description": "主机地址"},
"port": {"type": "integer", "description": "端口"},
"username": {"type": "string", "description": "用户名"},
"password": {"type": "string", "description": "密码"},
"datasourceType": {"type": "string", "description": "数据库类型,例如 PostgreSQL默认 PostgreSQL", "default": "PostgreSQL"},
},
required=["host", "port", "username", "password"],
param_categories={
"host": "body",
"port": "body",
"username": "body",
"password": "body",
"datasourceType": "body",
},
)
CHANGE_DATASOURCE_STATUS = _tool(
name="change_datasource_status",
description="修改数据源配置的启用/禁用状态",
method="PUT",
path="/datasource/config/changeStatus",
properties={
"id": {"type": "integer", "description": "数据源配置ID"},
"status": {"type": "integer", "description": "状态0正常/1停用"},
},
required=["id", "status"],
param_categories={"id": "body", "status": "body"},
)
EXPORT_DATASOURCE_CONFIGS = _tool(
name="export_datasource_configs",
description="导出数据源配置列表为Excel文件",
method="POST",
path="/datasource/config/export",
properties={
"datasourceName": {"type": "string", "description": "数据源名称(模糊查询)"},
"datasourceId": {"type": "integer", "description": "连接ID"},
"datasourceType": {"type": "string", "description": "数据库类型"},
"status": {"type": "integer", "description": "状态"},
},
required=[],
param_categories={
"datasourceName": "query",
"datasourceId": "query",
"datasourceType": "query",
"status": "query",
},
)
# =============================================================================
# 2. 数据库连接实例管理
# =============================================================================
LIST_CONNECTIONS = _tool(
name="list_connections",
description="查询数据库连接实例列表,支持分页和筛选",
method="GET",
path="/datasource/connection/list",
properties={
"datasourceName": {"type": "string", "description": "数据源名称(模糊查询)"},
"status": {"type": "integer", "description": "状态"},
"testStatus": {"type": "integer", "description": "测试状态0未测试/1成功/2失败"},
"sourceType": {"type": "string", "description": "连接来源builtin/external"},
**_PAGE_PROPS,
},
required=[],
param_categories={
"datasourceName": "query",
"status": "query",
"testStatus": "query",
"sourceType": "query",
"pageNum": "query",
"pageSize": "query",
},
)
GET_CONNECTION = _tool(
name="get_connection",
description="获取指定连接实例的详细信息",
method="GET",
path="/datasource/connection/{id}",
properties={
"id": {"type": "integer", "description": "连接实例ID"},
},
required=["id"],
param_categories={"id": "path"},
)
CREATE_CONNECTION = _tool(
name="create_connection",
description="创建新的数据库连接实例",
method="POST",
path="/datasource/connection",
properties={
"datasourceName": {"type": "string", "description": "连接名称"},
"datasourceType": {"type": "string", "description": "数据库类型,例如 PostgreSQL默认 PostgreSQL", "default": "PostgreSQL"},
"connectionType": {"type": "string", "description": "连接方式,例如 user_password默认 user_password", "default": "user_password"},
"host": {"type": "string", "description": "主机地址"},
"port": {"type": "integer", "description": "端口"},
"username": {"type": "string", "description": "用户名"},
"password": {"type": "string", "description": "密码"},
"status": {"type": "integer", "description": "状态"},
"remark": {"type": "string", "description": "备注信息"},
},
required=["datasourceName", "host", "port", "username", "password"],
param_categories={
"datasourceName": "body",
"datasourceType": "body",
"connectionType": "body",
"host": "body",
"port": "body",
"username": "body",
"password": "body",
"status": "body",
"remark": "body",
},
)
UPDATE_CONNECTION = _tool(
name="update_connection",
description="修改数据库连接实例信息",
method="PUT",
path="/datasource/connection",
properties={
"id": {"type": "integer", "description": "连接实例ID"},
"datasourceName": {"type": "string", "description": "连接名称"},
"host": {"type": "string", "description": "主机地址"},
"port": {"type": "integer", "description": "端口"},
"username": {"type": "string", "description": "用户名"},
"password": {"type": "string", "description": "密码"},
"remark": {"type": "string", "description": "更新备注"},
},
required=["id"],
param_categories={
"id": "body",
"datasourceName": "body",
"host": "body",
"port": "body",
"username": "body",
"password": "body",
"remark": "body",
},
)
DELETE_CONNECTION = _tool(
name="delete_connection",
description="删除指定的连接实例",
method="DELETE",
path="/datasource/connection/{id}",
properties={
"id": {"type": "integer", "description": "连接实例ID"},
},
required=["id"],
param_categories={"id": "path"},
)
TEST_CONNECTION = _tool(
name="test_connection",
description="测试数据库连接是否可用",
method="POST",
path="/datasource/connection/test",
properties={
"id": {"type": "integer", "description": "连接实例ID"},
"host": {"type": "string", "description": "主机地址"},
"port": {"type": "integer", "description": "端口"},
"username": {"type": "string", "description": "用户名"},
"password": {"type": "string", "description": "密码"},
"datasourceType": {"type": "string", "description": "数据库类型,例如 PostgreSQL默认 PostgreSQL", "default": "PostgreSQL"},
},
required=["host", "port", "username", "password"],
param_categories={
"id": "body",
"host": "body",
"port": "body",
"username": "body",
"password": "body",
"datasourceType": "body",
},
)
CHANGE_CONNECTION_STATUS = _tool(
name="change_connection_status",
description="修改连接实例的启用/禁用状态",
method="PUT",
path="/datasource/connection/changeStatus",
properties={
"id": {"type": "integer", "description": "连接实例ID"},
"status": {"type": "integer", "description": "状态0正常/1停用"},
},
required=["id", "status"],
param_categories={"id": "body", "status": "body"},
)
REALTIME_STRUCTURE = _tool(
name="realtime_structure",
description="实时查询连接下的所有数据库和表结构(直接连接数据库服务器查询)",
method="GET",
path="/datasource/connection/realtime/structure/{id}",
properties={
"id": {"type": "integer", "description": "连接实例ID"},
},
required=["id"],
param_categories={"id": "path"},
)
REALTIME_DATABASES = _tool(
name="realtime_databases",
description="实时查询连接下的所有数据库名称",
method="GET",
path="/datasource/connection/realtime/databases/{id}",
properties={
"id": {"type": "integer", "description": "连接实例ID"},
},
required=["id"],
param_categories={"id": "path"},
)
REALTIME_TABLES = _tool(
name="realtime_tables",
description="实时查询指定数据库下的所有表",
method="GET",
path="/datasource/connection/realtime/tables/{id}",
properties={
"id": {"type": "integer", "description": "连接实例ID"},
"databaseName": {"type": "string", "description": "数据库名称"},
},
required=["id", "databaseName"],
param_categories={"id": "path", "databaseName": "query"},
)
CREATE_BUILTIN_POSTGRESQL = _tool(
name="create_builtin_postgresql",
description="创建内置 PostgreSQL 数据库连接(使用配置文件中的连接信息)",
method="POST",
path="/datasource/connection/create_builtin_postgresql",
properties={
"datasourceName": {"type": "string", "description": "连接名称"},
"remark": {"type": "string", "description": "备注"},
},
required=["datasourceName"],
param_categories={"datasourceName": "body", "remark": "body"},
)
UPDATE_BUILTIN_DATABASE = _tool(
name="update_builtin_database",
description="修改内置 PostgreSQL 连接信息",
method="PUT",
path="/datasource/connection/update_builtin_database",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"datasourceName": {"type": "string", "description": "新名称"},
"remark": {"type": "string", "description": "更新备注"},
},
required=["connectionId"],
param_categories={"connectionId": "body", "datasourceName": "body", "remark": "body"},
)
EXECUTE_SQL = _tool(
name="execute_sql",
description="在指定数据源上执行 SQL 语句,支持参数化查询和环境切换",
method="POST",
path="/datasource/connection/{datasourceId}/execute_sql",
properties={
"datasourceId": {"type": "integer", "description": "数据源配置ID"},
"target": {
"type": "string",
"description": "目标环境prod/test默认 prod",
"default": "prod",
},
"sql": {"type": "string", "description": "SQL 语句,使用 ? 占位符"},
"params": {
"type": "array",
"description": "SQL 参数列表",
"items": {},
},
"databaseName": {"type": "string", "description": "数据库名称"},
**_API_KEY_PROP,
},
required=["datasourceId", "sql"],
param_categories={
"datasourceId": "path",
"target": "query",
"sql": "body",
"params": "body",
"databaseName": "body",
"apiKey": "header",
},
)
CREATE_DATABASE = _tool(
name="create_database",
description="在指定连接上创建新数据库(目前仅支持 PostgreSQL",
method="POST",
path="/datasource/connection/{connectionId}/create_database",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"databaseName": {"type": "string", "description": "数据库名称"},
"encoding": {"type": "string", "description": "字符编码,例如 UTF8"},
"owner": {"type": "string", "description": "所有者"},
**_API_KEY_PROP,
},
required=["connectionId", "databaseName"],
param_categories={
"connectionId": "path",
"databaseName": "body",
"encoding": "body",
"owner": "body",
"apiKey": "header",
},
)
CREATE_TABLE = _tool(
name="create_table",
description="在指定数据库中创建新表",
method="POST",
path="/datasource/connection/{connectionId}/create_table",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"databaseName": {"type": "string", "description": "数据库名称"},
"tableName": {"type": "string", "description": "表名"},
"tableComment": {"type": "string", "description": "表注释"},
"columns": {
"type": "array",
"description": "列定义列表",
"items": {
"type": "object",
"properties": {
"columnName": {"type": "string"},
"columnType": {"type": "string"},
"columnLength": {"type": "integer"},
"isPrimaryKey": {"type": "boolean"},
"isNullable": {"type": "boolean"},
"columnComment": {"type": "string"},
},
},
},
**_API_KEY_PROP,
},
required=["connectionId", "databaseName", "tableName", "columns"],
param_categories={
"connectionId": "path",
"databaseName": "body",
"tableName": "body",
"tableComment": "body",
"columns": "body",
"apiKey": "header",
},
)
CREATE_DATABASE_TABLE = _tool(
name="create_database_table",
description="同时创建数据库和表(一次性操作)",
method="POST",
path="/datasource/connection/{connectionId}/create_database_table",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"databaseName": {"type": "string", "description": "数据库名称"},
"encoding": {"type": "string", "description": "字符编码"},
"owner": {"type": "string", "description": "所有者"},
"tables": {
"type": "array",
"description": "要创建的表列表",
"items": {
"type": "object",
"properties": {
"tableName": {"type": "string"},
"tableComment": {"type": "string"},
"columns": {"type": "array"},
},
},
},
**_API_KEY_PROP,
},
required=["connectionId", "databaseName", "tables"],
param_categories={
"connectionId": "path",
"databaseName": "body",
"encoding": "body",
"owner": "body",
"tables": "body",
"apiKey": "header",
},
)
ALTER_DATABASE = _tool(
name="alter_database",
description="修改数据库属性(重命名、更改所有者、更改编码)",
method="PUT",
path="/datasource/connection/{connectionId}/alter_database",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"databaseName": {"type": "string", "description": "原数据库名称"},
"newName": {"type": "string", "description": "新数据库名称"},
"newOwner": {"type": "string", "description": "新所有者"},
"newEncoding": {"type": "string", "description": "新字符编码"},
**_API_KEY_PROP,
},
required=["connectionId", "databaseName"],
param_categories={
"connectionId": "path",
"databaseName": "body",
"newName": "body",
"newOwner": "body",
"newEncoding": "body",
"apiKey": "header",
},
)
ALTER_TABLE = _tool(
name="alter_table",
description="修改表结构(添加列、删除列、重命名列、修改列类型等)",
method="PUT",
path="/datasource/connection/{connectionId}/alter_table",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"databaseName": {"type": "string", "description": "数据库名称"},
"tableName": {"type": "string", "description": "表名"},
"tableComment": {"type": "string", "description": "表注释"},
"operations": {
"type": "array",
"description": "操作列表",
"items": {
"type": "object",
"properties": {
"operation": {"type": "string"},
"column": {"type": "object"},
},
},
},
**_API_KEY_PROP,
},
required=["connectionId", "databaseName", "tableName", "operations"],
param_categories={
"connectionId": "path",
"databaseName": "body",
"tableName": "body",
"tableComment": "body",
"operations": "body",
"apiKey": "header",
},
)
GENERATE_TABLE = _tool(
name="generate_table",
description="使用 AI 根据需求描述生成表结构(异步任务)",
method="POST",
path="/datasource/connection/generate_table",
properties={
"requirement": {"type": "string", "description": "需求描述"},
"databaseId": {"type": "integer", "description": "关联的数据库ID"},
},
required=["requirement"],
param_categories={"requirement": "body", "databaseId": "body"},
)
IMPORT_DOCUMENT_PREVIEW = _tool(
name="import_document_preview",
description="上传 Excel/CSV 文件AI 识别表结构并预览前 10 条数据",
method="POST",
path="/datasource/connection/{connectionId}/import_document/preview",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"target": {
"type": "string",
"description": "目标环境prod/test默认 prod",
"default": "prod",
},
"filePath": {"type": "string", "description": "本地 Excel/CSV 文件路径"},
},
required=["connectionId", "filePath"],
param_categories={
"connectionId": "path",
"target": "query",
"filePath": "file",
},
)
IMPORT_DOCUMENT_CONFIRM = _tool(
name="import_document_confirm",
description="确认导入,创建表并插入数据",
method="POST",
path="/datasource/connection/{connectionId}/import_document/confirm",
properties={
"connectionId": {"type": "integer", "description": "连接实例ID"},
"target": {
"type": "string",
"description": "目标环境prod/test默认 prod",
"default": "prod",
},
"tableStructure": {
"type": "object",
"description": "表结构定义",
},
"allData": {
"type": "array",
"description": "完整导入数据",
},
**_API_KEY_PROP,
},
required=["connectionId", "tableStructure", "allData"],
param_categories={
"connectionId": "path",
"target": "query",
"tableStructure": "body",
"allData": "body",
"apiKey": "header",
},
)
BUILTIN_TABLE_DATA = _tool(
name="builtin_table_data",
description="根据表ID查询表结构和数据分页",
method="GET",
path="/datasource/connection/builtin/table/{tableId}",
properties={
"tableId": {"type": "integer", "description": "表ID"},
"pageNum": {"type": "integer", "description": "页码默认1"},
"pageSize": {"type": "integer", "description": "每页数量默认100"},
"target": {
"type": "string",
"description": "目标环境prod/test默认 prod",
"default": "prod",
},
},
required=["tableId"],
param_categories={
"tableId": "path",
"pageNum": "query",
"pageSize": "query",
"target": "query",
},
)
BUILTIN_TABLE_INSERT = _tool(
name="builtin_table_insert",
description="向指定表插入一条数据",
method="POST",
path="/datasource/connection/builtin/table/{tableId}/rows",
properties={
"tableId": {"type": "integer", "description": "表ID"},
"target": {
"type": "string",
"description": "目标环境prod/test默认 prod",
"default": "prod",
},
"data": {"type": "object", "description": "要插入的数据"},
},
required=["tableId", "data"],
param_categories={
"tableId": "path",
"target": "query",
"data": "body",
},
)
BUILTIN_TABLE_UPDATE = _tool(
name="builtin_table_update",
description="根据主键更新表数据",
method="PUT",
path="/datasource/connection/builtin/table/{tableId}/rows",
properties={
"tableId": {"type": "integer", "description": "表ID"},
"target": {
"type": "string",
"description": "目标环境prod/test默认 prod",
"default": "prod",
},
"data": {"type": "object", "description": "要更新的数据"},
"primaryKey": {"type": "object", "description": "主键条件"},
},
required=["tableId", "data", "primaryKey"],
param_categories={
"tableId": "path",
"target": "query",
"data": "body",
"primaryKey": "body",
},
)
BUILTIN_TABLE_DELETE = _tool(
name="builtin_table_delete",
description="根据主键批量删除表数据",
method="DELETE",
path="/datasource/connection/builtin/table/{tableId}/rows",
properties={
"tableId": {"type": "integer", "description": "表ID"},
"target": {
"type": "string",
"description": "目标环境prod/test默认 prod",
"default": "prod",
},
"primaryKeys": {
"type": "array",
"description": "主键条件列表",
"items": {"type": "object"},
},
},
required=["tableId", "primaryKeys"],
param_categories={
"tableId": "path",
"target": "query",
"primaryKeys": "body",
},
)
ALL_TOOLS = [
LIST_DATASOURCE_CONFIGS,
GET_DATASOURCE_CONFIG,
BATCH_CREATE_DATASOURCE_CONFIGS,
REPLACE_DATASOURCE_CONFIGS,
BATCH_UPDATE_DATASOURCE_CONFIGS,
DELETE_DATASOURCE_CONFIGS,
TEST_CONNECTION_CONFIG,
CHANGE_DATASOURCE_STATUS,
EXPORT_DATASOURCE_CONFIGS,
LIST_CONNECTIONS,
GET_CONNECTION,
CREATE_CONNECTION,
UPDATE_CONNECTION,
DELETE_CONNECTION,
TEST_CONNECTION,
CHANGE_CONNECTION_STATUS,
REALTIME_STRUCTURE,
REALTIME_DATABASES,
REALTIME_TABLES,
CREATE_BUILTIN_POSTGRESQL,
UPDATE_BUILTIN_DATABASE,
EXECUTE_SQL,
CREATE_DATABASE,
CREATE_TABLE,
CREATE_DATABASE_TABLE,
ALTER_DATABASE,
ALTER_TABLE,
GENERATE_TABLE,
IMPORT_DOCUMENT_PREVIEW,
IMPORT_DOCUMENT_CONFIRM,
BUILTIN_TABLE_DATA,
BUILTIN_TABLE_INSERT,
BUILTIN_TABLE_UPDATE,
BUILTIN_TABLE_DELETE,
]
TOOL_MAP = {tool["name"]: tool for tool in ALL_TOOLS}
def list_tools() -> List[Dict[str, Any]]:
"""Return all tool definitions."""
return ALL_TOOLS
def get_tool(name: str) -> Dict[str, Any]:
"""Return a tool definition by name."""
return TOOL_MAP.get(name)

View File

@@ -0,0 +1,14 @@
"""Utils package for lzwcai_mcp_agile_db_third."""
from .env_config import get_backend_base_url, get_api_key, get_env_config
from .api_client import DataSourceAPIClient, api_request
from .logger_config import logger_config
__all__ = [
"get_backend_base_url",
"get_api_key",
"get_env_config",
"DataSourceAPIClient",
"api_request",
"logger_config",
]

View File

@@ -0,0 +1,223 @@
"""Backend API client for datasource APIs."""
import base64
import logging
import mimetypes
import os
from typing import Any, Dict, Optional
from urllib.parse import urlencode
import httpx
try:
from .env_config import get_backend_base_url
except ImportError:
from env_config import get_backend_base_url
logger = logging.getLogger(__name__)
DEFAULT_TOKEN = (
""
)
_SENSITIVE_FIELDS = {"password", "apiKey", "token", "secret"}
def _mask_secret(value: Optional[str]) -> str:
"""Mask a secret for logging, keeping only a few leading/trailing chars."""
if not value:
return "<空>"
if len(value) <= 8:
return "***"
return f"{value[:4]}...{value[-4:]} (len={len(value)})"
class DataSourceAPIClient:
"""HTTP client for backend datasource APIs."""
def __init__(
self,
base_url: Optional[str] = None,
token: Optional[str] = None,
):
self.base_url = (base_url or get_backend_base_url()).rstrip("/")
# Prefer explicit token, then API_KEY env var, then built-in default.
resolved = token or os.environ.get("API_KEY") or DEFAULT_TOKEN
self.token = resolved.removeprefix("Bearer ").strip()
self.client = httpx.Client(timeout=120.0)
def close(self) -> None:
"""Close the underlying HTTP client."""
self.client.close()
def _get_headers(self, api_key: Optional[str] = None) -> Dict[str, str]:
headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/json",
}
if api_key:
headers["X-Datasource-API-Key"] = api_key
return headers
@staticmethod
def _sanitize_value(value: Any) -> Any:
"""Redact sensitive nested values for logging."""
if isinstance(value, dict):
return {
k: "***" if k in _SENSITIVE_FIELDS else DataSourceAPIClient._sanitize_value(v)
for k, v in value.items()
}
if isinstance(value, list):
return [DataSourceAPIClient._sanitize_value(item) for item in value]
return value
@staticmethod
def _file_tuple(file_path: str) -> tuple:
"""Build a multipart file tuple with filename and content type."""
filename = os.path.basename(file_path)
content_type, _ = mimetypes.guess_type(filename)
if content_type is None:
content_type = "application/octet-stream"
return (filename, open(file_path, "rb"), content_type)
def request(
self,
method: str,
path_template: str,
path_params: Optional[Dict[str, Any]] = None,
query_params: Optional[Dict[str, Any]] = None,
body: Optional[Dict[str, Any]] = None,
file_path: Optional[str] = None,
api_key: Optional[str] = None,
) -> Dict[str, Any]:
"""Send a request and return JSON response."""
path_params = path_params or {}
query_params = query_params or {}
try:
url = self._build_url(path_template, path_params, query_params)
headers = self._get_headers(api_key)
safe_body = self._sanitize_value(body) if body else body
logger.info(f"调用后端 API: {method} {url}")
logger.info(
f"请求密钥 - X-Datasource-API-Key: {_mask_secret(api_key)}, "
f"Authorization token: {_mask_secret(self.token)}"
)
logger.debug(
f"path_params={path_params}, query_params={query_params}, "
f"body={safe_body}, file_path={file_path}"
)
method = method.upper()
if method == "GET":
response = self.client.get(url, headers=headers)
elif method == "DELETE":
if body is not None:
headers["Content-Type"] = "application/json"
response = self.client.request(method, url, headers=headers, json=body)
else:
response = self.client.delete(url, headers=headers)
elif file_path:
# Multipart upload
headers.pop("Content-Type", None)
file_tuple = self._file_tuple(file_path)
files = {"file": file_tuple}
try:
response = self.client.request(
method, url, headers=headers, data=body, files=files
)
finally:
file_tuple[1].close()
elif body is not None:
headers["Content-Type"] = "application/json"
response = self.client.request(method, url, headers=headers, json=body)
else:
response = self.client.request(method, url, headers=headers)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
data = response.json()
else:
# Non-JSON response (e.g., Excel export), encode as base64
data = {
"success": True,
"contentType": content_type,
"fileBase64": base64.b64encode(response.content).decode("ascii"),
"message": "后端返回非 JSON 内容,已使用 base64 编码",
}
logger.info(f"后端 API 调用成功: {method} {url}")
logger.debug(f"响应: {data}")
return data
except httpx.TimeoutException as e:
error_msg = f"API 请求超时: {method} {path_template}"
logger.error(error_msg)
raise Exception(error_msg) from e
except httpx.HTTPStatusError as e:
error_msg = f"API 请求失败 (HTTP {e.response.status_code}): {e.response.text}"
logger.error(error_msg)
raise Exception(error_msg) from e
except httpx.RequestError as e:
error_msg = f"API 请求异常: {method} {path_template}, 错误: {e}"
logger.error(error_msg)
raise Exception(error_msg) from e
except Exception as e:
error_msg = f"处理 API 响应时出错: {e}"
logger.error(error_msg, exc_info=True)
raise Exception(error_msg) from e
def _build_url(
self,
path_template: str,
path_params: Dict[str, Any],
query_params: Dict[str, Any],
) -> str:
path = path_template
for key, value in path_params.items():
path = path.replace(f"{{{key}}}", str(value))
url = f"{self.base_url}{path}"
if query_params:
filtered = {k: v for k, v in query_params.items() if v is not None}
if filtered:
url = f"{url}?{urlencode(filtered)}"
return url
def api_request(
method: str,
path_template: str,
path_params: Optional[Dict[str, Any]] = None,
query_params: Optional[Dict[str, Any]] = None,
body: Optional[Dict[str, Any]] = None,
file_path: Optional[str] = None,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
token: Optional[str] = None,
) -> Dict[str, Any]:
"""Convenience wrapper for one-off API requests."""
client = DataSourceAPIClient(base_url=base_url, token=token)
try:
return client.request(
method=method,
path_template=path_template,
path_params=path_params,
query_params=query_params,
body=body,
file_path=file_path,
api_key=api_key,
)
finally:
client.close()
# Module-level default client
_default_client = DataSourceAPIClient()
def get_default_client() -> DataSourceAPIClient:
return _default_client

View File

@@ -0,0 +1,31 @@
"""环境变量配置模块"""
import os
def get_backend_base_url(default: str = "http://lzwcai-demp-corp-manager:8086") -> str:
"""
获取后端 API 基础 URL。
Environment Variables:
backendBaseUrl: 后端 API 基础 URL
"""
return os.environ.get("backendBaseUrl", default)
def get_api_key(default: str = "") -> str:
"""
获取默认 API 密钥X-Datasource-API-Key
Environment Variables:
datasourceApiKey: 默认 API 密钥
"""
return os.environ.get("datasourceApiKey", default)
def get_env_config() -> dict:
"""获取所有环境配置。"""
return {
"backend_base_url": get_backend_base_url(),
"api_key": get_api_key(),
}

View File

@@ -0,0 +1,85 @@
"""统一日志配置模块"""
import logging
import os
import sys
from pathlib import Path
class LoggerConfig:
"""日志配置管理类"""
def __init__(self, logs_dir: str = None):
if logs_dir:
self.logs_dir = Path(logs_dir)
else:
project_root = Path(__file__).parent.parent
self.logs_dir = project_root / "logs"
self.logs_dir.mkdir(parents=True, exist_ok=True)
self.log_format = "%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s"
self.date_format = "%Y-%m-%d %H:%M:%S"
self._initialized = False
def setup_logging(
self,
app_name: str = "lzwcai_mcp_agile_db_third",
log_level: int = logging.INFO,
console_output: bool = False,
) -> logging.Logger:
"""设置系统日志配置。"""
if self._initialized:
return logging.getLogger()
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
formatter = logging.Formatter(self.log_format, self.date_format)
# 主日志文件
main_log_file = self.logs_dir / f"{app_name}.log"
file_handler = logging.FileHandler(main_log_file, encoding="utf-8")
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# 错误日志文件
error_log_file = self.logs_dir / f"{app_name}_error.log"
error_handler = logging.FileHandler(error_log_file, encoding="utf-8")
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
root_logger.addHandler(error_handler)
# MCP 使用 stdio 时,控制台日志必须输出到 stderr
if console_output:
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
self._initialized = True
root_logger.info(f"日志系统初始化完成 - 日志目录: {self.logs_dir}")
return root_logger
def setup_mcp_logging(self) -> logging.Logger:
"""设置 MCP 专用日志。"""
return self.create_component_logger("mcp_services", "mcp_services.log", logging.DEBUG)
def create_component_logger(
self, component_name: str, log_file: str = None, log_level: int = None
) -> logging.Logger:
"""为特定组件创建独立日志器。"""
logger = logging.getLogger(component_name)
if log_file:
component_log_file = self.logs_dir / log_file
handler = logging.FileHandler(component_log_file, encoding="utf-8")
handler.setFormatter(logging.Formatter(self.log_format, self.date_format))
if log_level is not None:
handler.setLevel(log_level)
logger.addHandler(handler)
return logger
logger_config = LoggerConfig()

View File

@@ -0,0 +1,17 @@
"""
Repository-local launcher for lzwcai-mcp-sqlexecutor.
"""
import os
def main():
# Keep local developer defaults without overriding explicit environment settings.
os.environ.setdefault("backendBaseUrl", "http://192.168.2.236:8088")
os.environ.setdefault("datasourceApiKey", "yBOHyhCSpExAoEleimSfhbRzsF6SDiPYdGGwowXG-Sk")
from lzwcai_mcp_agile_db_third.main import main
main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,41 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "lzwcai-mcp-agile-db-third"
version = "0.1.5"
description = "MCP server for Agile DB third-party datasource APIs"
readme = "README.md"
requires-python = ">=3.12"
license = {text = "MIT"}
authors = [
{name = "lzwcai", email = "your-email@example.com"},
]
keywords = ["mcp", "datasource", "database", "agile-db"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"httpx>=0.28.1",
"mcp[cli]>=1.10.1",
]
[project.scripts]
lzwcai-mcp-agile-db-third = "lzwcai_mcp_agile_db_third.main:main"
[tool.hatch.build.targets.wheel]
packages = ["lzwcai_mcp_agile_db_third"]
[tool.hatch.build.targets.wheel.force-include]
[tool.hatch.build]
exclude = [
"lzwcai_mcp_agile_db_third/logs/**",
"**/.__pycache__/**",
"lzwcai_mcp_agile_db_third/.gitignore",
".venv/**",
]

View File

@@ -0,0 +1,418 @@
"""End-to-end self-test for all lzwcai_mcp_agile_db_third MCP tools.
Runs every tool through the real MCP handler (handle_call_tool) against the
backend configured below. Uses selftest_-prefixed throwaway resources and
cleans them up at the end. The destructive full-replace tool is skipped.
"""
import asyncio
import csv
import json
import os
import sys
import tempfile
import uuid
os.environ.setdefault("backendBaseUrl", "http://192.168.2.236:8088")
# Login token (Authorization Bearer) shared with the first-party AgileDB MCP server.
os.environ.setdefault(
"API_KEY",
"Bearer eyJhbGciOiJIUzUxMiJ9.eyJ0b2tlbl90eXBlIjoiTE9HSU4iLCJsb2dpbl91c2VyX2tleSI6"
"ImNlMDAwYjA4LWU0YTYtNGM2MS1hNzJiLWI3NTlmNmY1N2Q4NCJ9.jiNmGQZfL4-nSIFrLuaCt7mT"
"5zj0FOojAVkLeHwPOroI5jBxodrCe1PSwGO1OHq5Ztb0tLEVZw2FFVj0OlTceQ",
)
# Optional X-Datasource-API-Key for datasource-level permission checks (if enforced).
os.environ.setdefault("datasourceApiKey", "Mggkz34Yk8cbjUvCvQ-qeooNRg62WhSwwtxUUV6e0Pg")
HERE = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, HERE)
from lzwcai_mcp_agile_db_third.main import handle_call_tool # noqa: E402
from lzwcai_mcp_agile_db_third.mock_db import start as start_mock_db, stop as stop_mock_db # noqa: E402
from lzwcai_mcp_agile_db_third.tools import ALL_TOOLS # noqa: E402
RESULTS = [] # (name, status, detail)
TESTED = set()
async def call(name, args=None):
"""Invoke a tool through the MCP handler and return parsed JSON."""
TESTED.add(name)
try:
resp = await handle_call_tool(name, args or {})
return json.loads(resp[0].text)
except Exception as e: # noqa: BLE001
return {"_exception": repr(e)}
def summarize(data):
"""Extract a short (ok, detail) from a backend response."""
if not isinstance(data, dict):
return False, str(data)[:200]
if "_exception" in data:
return False, data["_exception"][:200]
if "error" in data:
return False, str(data["error"])[:200]
code = data.get("code")
msg = data.get("msg", "")
if code == 200:
return True, f"code=200 msg={msg}"
if code is not None:
return False, f"code={code} msg={msg}"
# list/paginated or already-unwrapped payloads
return True, json.dumps(data, ensure_ascii=False)[:160]
def record(name, data, mode="ok"):
"""Record a tool result.
mode="ok" -> PASS only when backend code==200
mode="roundtrip" -> PASS when the call round-trips (no exception and the
backend returned a structured response), regardless of
business code. Used for ops whose success depends on a
real reachable DB we can't guarantee in self-test.
"""
ok, detail = summarize(data)
if mode == "roundtrip":
exception = isinstance(data, dict) and "_exception" in data
passed = not exception
else:
passed = ok
status = "PASS" if passed else "FAIL"
RESULTS.append((name, status, detail))
print(f"[{status}] {name}: {detail}")
return data
def dig(data, *keys, default=None):
"""Safely walk nested dict keys."""
cur = data
for k in keys:
if not isinstance(cur, dict):
return default
cur = cur.get(k)
return cur if cur is not None else default
async def main():
PREFIX = f"selftest_{uuid.uuid4().hex[:8]}_"
state = {}
# ---- 1. read-only: connections & configs ----------------------------
print("\n=== 只读查询 ===")
conns = record("list_connections", await call("list_connections", {"pageSize": 5}))
# pick a builtin connection to exercise builtin/realtime tools
rows = conns.get("rows") or dig(conns, "data", "rows", default=[]) or []
builtin_id = None
any_conn_id = None
for r in rows:
cid = r.get("id")
if any_conn_id is None:
any_conn_id = cid
if r.get("sourceType") == "builtin" and builtin_id is None:
builtin_id = cid
state["builtin_id"] = builtin_id
state["any_conn_id"] = any_conn_id
print(f" -> builtin_id={builtin_id}, any_conn_id={any_conn_id}")
record("list_datasource_configs", await call("list_datasource_configs", {"pageSize": 5}))
if any_conn_id:
record("get_connection", await call("get_connection", {"id": any_conn_id}))
record("realtime_databases", await call("realtime_databases", {"id": any_conn_id}))
record("realtime_structure", await call("realtime_structure", {"id": any_conn_id}))
else:
for n in ("get_connection", "realtime_databases", "realtime_structure"):
RESULTS.append((n, "SKIP", "no connection available"))
# ---- 2. create a builtin PostgreSQL connection ----------------------
print("\n=== 创建内置连接 ===")
created = record(
"create_builtin_postgresql",
await call("create_builtin_postgresql", {
"datasourceName": PREFIX + "conn",
"remark": "self-test connection",
}),
)
conn_id = dig(created, "data", "id")
if conn_id is None and builtin_id:
conn_id = builtin_id # fall back to an existing builtin for downstream tools
state["conn_id"] = conn_id
print(f" -> conn_id={conn_id}")
if conn_id:
record("update_builtin_database", await call("update_builtin_database", {
"connectionId": conn_id,
"datasourceName": PREFIX + "conn_renamed",
"remark": "renamed by self-test",
}))
record("get_connection", await call("get_connection", {"id": conn_id}))
# ---- 3. external connection: test/create/update/status/delete -------
print("\n=== 外部连接测试(依赖真实可达的库,可能失败属正常)===")
# Try to spin up a local Docker Postgres mock. If Docker is unavailable or
# the backend cannot reach the host IP, fall back to 127.0.0.1 and keep the
# round-trip assertion (we still verify the tool round-trips).
mock_host = "127.0.0.1"
try:
mock_host = start_mock_db()
print(f" -> mock DB available at {mock_host}:5432")
except Exception as e: # noqa: BLE001
print(f" -> could not start mock DB, using 127.0.0.1:5432 ({e})")
# test_connection_config / test_connection rely on datasourceType default now
record(
"test_connection_config",
await call("test_connection_config", {
"host": mock_host, "port": 5432,
"username": "postgres", "password": "postgres",
}),
mode="roundtrip", # may still fail if backend cannot reach mock_host
)
# create_connection (external) — exercises datasourceType/connectionType defaults
ext = record(
"create_connection",
await call("create_connection", {
"datasourceName": PREFIX + "ext",
"host": mock_host, "port": 5432,
"username": "postgres", "password": "postgres",
"remark": "self-test external",
}),
mode="roundtrip",
)
ext_id = dig(ext, "data", "id")
state["ext_id"] = ext_id
print(f" -> ext_id={ext_id}")
if ext_id:
record("update_connection", await call("update_connection", {
"id": ext_id, "datasourceName": PREFIX + "ext_renamed",
"remark": "renamed",
}), mode="roundtrip")
record("test_connection", await call("test_connection", {
"id": ext_id, "host": mock_host, "port": 5432,
"username": "postgres", "password": "postgres",
}), mode="roundtrip")
record("change_connection_status", await call("change_connection_status", {
"id": ext_id, "status": 1,
}), mode="roundtrip")
else:
for n in ("update_connection", "test_connection", "change_connection_status"):
RESULTS.append((n, "SKIP", "no external connection id returned"))
# ---- 4. DDL on the builtin connection ------------------------------
print("\n=== DDL库/表 ===")
ddl_conn = conn_id or builtin_id
db_name = PREFIX + "db"
tbl_name = PREFIX + "users"
cols = [
{"columnName": "id", "columnType": "BIGINT", "isPrimaryKey": True,
"isNullable": False, "columnComment": "主键"},
{"columnName": "name", "columnType": "VARCHAR", "columnLength": 100,
"isNullable": False, "columnComment": "姓名"},
{"columnName": "age", "columnType": "INTEGER", "isNullable": True,
"columnComment": "年龄"},
]
if ddl_conn:
record("create_database", await call("create_database", {
"connectionId": ddl_conn, "databaseName": db_name,
"encoding": "UTF8",
}), mode="roundtrip")
record("create_table", await call("create_table", {
"connectionId": ddl_conn, "databaseName": db_name,
"tableName": tbl_name, "tableComment": "self-test 用户表",
"columns": cols,
}), mode="roundtrip")
record("realtime_tables", await call("realtime_tables", {
"id": ddl_conn, "databaseName": db_name,
}), mode="roundtrip")
record("create_database_table", await call("create_database_table", {
"connectionId": ddl_conn, "databaseName": PREFIX + "db2",
"encoding": "UTF8",
"tables": [{"tableName": PREFIX + "t2", "tableComment": "二号表",
"columns": cols}],
}), mode="roundtrip")
record("alter_table", await call("alter_table", {
"connectionId": ddl_conn, "databaseName": db_name,
"tableName": tbl_name,
"operations": [{"operation": "ADD_COLUMN", "column": {
"columnName": "email", "columnType": "VARCHAR",
"columnLength": 255, "isNullable": True, "columnComment": "邮箱"}}],
}), mode="roundtrip")
record("alter_database", await call("alter_database", {
"connectionId": ddl_conn, "databaseName": PREFIX + "db2",
"newName": PREFIX + "db2_renamed",
}), mode="roundtrip")
else:
for n in ("create_database", "create_table", "create_database_table",
"alter_table", "alter_database"):
RESULTS.append((n, "SKIP", "no builtin connection for DDL"))
# ---- 5. find the table id, exercise execute_sql + builtin CRUD ------
print("\n=== SQL 执行 + 内置表数据 CRUD ===")
# locate a datasource config id + table id for our table
cfgs = await call("list_datasource_configs", {"datasourceName": PREFIX, "pageSize": 20})
cfg_rows = cfgs.get("rows") or dig(cfgs, "data", "rows", default=[]) or []
datasource_id = cfg_rows[0].get("id") if cfg_rows else None
state["datasource_id"] = datasource_id
table_id = None
if ddl_conn:
detail = await call("get_connection", {"id": ddl_conn})
for ds in dig(detail, "data", "datasourceConfig", default=[]) or []:
for t in ds.get("tables", []) or []:
if str(t.get("tableName", "")).startswith(PREFIX):
table_id = t.get("tableId") or t.get("id")
break
if table_id:
break
state["table_id"] = table_id
print(f" -> datasource_id={datasource_id}, table_id={table_id}")
if datasource_id:
record("execute_sql", await call("execute_sql", {
"datasourceId": datasource_id,
"sql": "SELECT 1",
"databaseName": db_name,
}), mode="roundtrip")
else:
RESULTS.append(("execute_sql", "SKIP", "no datasource config id"))
if table_id:
record("builtin_table_insert", await call("builtin_table_insert", {
"tableId": table_id, "data": {"id": 1, "name": "张三", "age": 25},
}), mode="roundtrip")
record("builtin_table_data", await call("builtin_table_data", {
"tableId": table_id,
}), mode="roundtrip")
record("builtin_table_update", await call("builtin_table_update", {
"tableId": table_id, "data": {"name": "李四", "age": 30},
"primaryKey": {"id": 1},
}), mode="roundtrip")
record("builtin_table_delete", await call("builtin_table_delete", {
"tableId": table_id, "primaryKeys": [{"id": 1}],
}), mode="roundtrip")
else:
for n in ("builtin_table_insert", "builtin_table_data",
"builtin_table_update", "builtin_table_delete"):
RESULTS.append((n, "SKIP", "no table id found"))
# ---- 6. AI generate + document import (preview/confirm) -------------
print("\n=== AI 生成 + 文档导入 ===")
record("generate_table", await call("generate_table", {
"requirement": "一个简单的待办事项表,含标题、状态、创建时间",
"databaseId": datasource_id,
}), mode="roundtrip")
# build a tiny CSV for import preview
csv_path = os.path.join(tempfile.gettempdir(), "selftest_import.csv")
with open(csv_path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["name", "age", "gender"])
w.writerow(["张三", "25", "male"])
w.writerow(["李四", "30", "female"])
preview = None
if ddl_conn:
preview = record("import_document_preview", await call("import_document_preview", {
"connectionId": ddl_conn, "filePath": csv_path,
}), mode="roundtrip")
ts = dig(preview, "data", "tableStructure")
all_data = dig(preview, "data", "allData")
if ts and all_data:
record("import_document_confirm", await call("import_document_confirm", {
"connectionId": ddl_conn, "tableStructure": ts, "allData": all_data,
}), mode="roundtrip")
else:
RESULTS.append(("import_document_confirm", "SKIP",
"preview returned no tableStructure/allData"))
else:
for n in ("import_document_preview", "import_document_confirm"):
RESULTS.append((n, "SKIP", "no builtin connection"))
# ---- 7. datasource config batch ops --------------------------------
print("\n=== 数据源配置批量操作 ===")
if ddl_conn:
record("batch_create_datasource_configs", await call(
"batch_create_datasource_configs", {
"connectionId": ddl_conn,
"datasourceNamePrefix": PREFIX + "cfg",
"syncTables": False,
"databases": [{"databaseName": db_name, "tableNames": []}],
}), mode="roundtrip")
else:
RESULTS.append(("batch_create_datasource_configs", "SKIP", "no connection"))
if datasource_id:
record("get_datasource_config", await call("get_datasource_config", {
"id": datasource_id}), mode="roundtrip")
record("change_datasource_status", await call("change_datasource_status", {
"id": datasource_id, "status": 1}), mode="roundtrip")
record("batch_update_datasource_configs", await call(
"batch_update_datasource_configs", {
"syncTables": False,
"datasources": [{"id": datasource_id, "tableNames": []}],
}), mode="roundtrip")
else:
for n in ("get_datasource_config", "change_datasource_status",
"batch_update_datasource_configs"):
RESULTS.append((n, "SKIP", "no datasource config id"))
record("export_datasource_configs", await call("export_datasource_configs", {
"datasourceName": PREFIX}), mode="roundtrip")
# replace_datasource_configs intentionally skipped (destructive)
RESULTS.append(("replace_datasource_configs", "SKIP",
"destructive full-replace, skipped by design"))
# ---- 8. cleanup -----------------------------------------------------
print("\n=== 清理 selftest_ 资源 ===")
# delete selftest datasource configs
cfgs2 = await call("list_datasource_configs", {"datasourceName": PREFIX, "pageSize": 50})
cfg_rows2 = cfgs2.get("rows") or dig(cfgs2, "data", "rows", default=[]) or []
del_ids = [r.get("id") for r in cfg_rows2 if r.get("id") is not None]
if del_ids:
record("delete_datasource_configs", await call("delete_datasource_configs", {
"ids": del_ids}), mode="roundtrip")
else:
RESULTS.append(("delete_datasource_configs", "SKIP", "nothing to delete"))
# delete selftest connections
deleted_conn = False
for cid in (state.get("ext_id"), conn_id):
if cid:
record("delete_connection", await call("delete_connection", {"id": cid}),
mode="roundtrip")
deleted_conn = True
if not deleted_conn:
RESULTS.append(("delete_connection", "SKIP", "no selftest connection to delete"))
try:
os.remove(csv_path)
except OSError:
pass
try:
stop_mock_db()
except Exception as e: # noqa: BLE001
print(f" -> failed to stop mock DB: {e}")
# ---- summary --------------------------------------------------------
print("\n" + "=" * 60)
print("自测结果汇总")
print("=" * 60)
all_names = {t["name"] for t in ALL_TOOLS}
counts = {"PASS": 0, "FAIL": 0, "SKIP": 0}
for name, status, detail in RESULTS:
counts[status] = counts.get(status, 0) + 1
for name, status, detail in RESULTS:
print(f" [{status}] {name}: {detail}")
untested = all_names - TESTED
print("-" * 60)
print(f"工具总数: {len(all_names)} 覆盖: {len(TESTED)} 未触达: {sorted(untested)}")
print(f"PASS={counts['PASS']} FAIL={counts['FAIL']} SKIP={counts['SKIP']}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,4 @@
[[index]]
name = "pypi"
url = "https://pypi.org/simple/"
default = true