Files
lzwcai-mcp-server-package/lzwcai_mcp_agile_db_third/selftest_tools.py
yuanzhipeng ba5cd4bbe1 feat(lzwcai-agile-db): 更新AgileDB技能至v0.4.2版本并扩展工具集
- 将技能版本从0.2.0升级至0.4.2
- 工具数量从33个扩展至57个,新增数据源管理、AI训练、库表关联配置等功能
- 新增MQTT字段关联同步模块(8个工具)和库表关联配置(3个工具)
- 添加重要的契约提示和安全确认原则,包括target默认值、alter_table操作限制等
- 修正工具参数说明,如execute_sql的executableSql改为sql,参数结构优化
- 增强安全机制,明确危险操作的用户确认流程和目标资源选择规则
- 更新README.md中的工具数量统计和功能描述
2026-06-17 14:40:43 +08:00

419 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""End-to-end self-test for all lzwcai_mcp_agile_db_third MCP tools.
Runs every tool through the real MCP handler (handle_call_tool) against the
backend configured below. Uses selftest_-prefixed throwaway resources and
cleans them up at the end. The destructive full-replace tool is skipped.
"""
import asyncio
import csv
import json
import os
import sys
import tempfile
import uuid
os.environ.setdefault("backendBaseUrl", "http://192.168.2.236:8088")
# Login token (Authorization Bearer) shared with the first-party AgileDB MCP server.
os.environ.setdefault(
"API_KEY",
"Bearer eyJhbGciOiJIUzUxMiJ9.eyJ0b2tlbl90eXBlIjoiTE9HSU4iLCJsb2dpbl91c2VyX2tleSI6"
"ImNlMDAwYjA4LWU0YTYtNGM2MS1hNzJiLWI3NTlmNmY1N2Q4NCJ9.jiNmGQZfL4-nSIFrLuaCt7mT"
"5zj0FOojAVkLeHwPOroI5jBxodrCe1PSwGO1OHq5Ztb0tLEVZw2FFVj0OlTceQ",
)
# Optional X-Datasource-API-Key for datasource-level permission checks (if enforced).
os.environ.setdefault("datasourceApiKey", "Mggkz34Yk8cbjUvCvQ-qeooNRg62WhSwwtxUUV6e0Pg")
HERE = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, HERE)
from lzwcai_mcp_agile_db_third.main import handle_call_tool # noqa: E402
from lzwcai_mcp_agile_db_third.mock_db import start as start_mock_db, stop as stop_mock_db # noqa: E402
from lzwcai_mcp_agile_db_third.tools import ALL_TOOLS # noqa: E402
RESULTS = [] # (name, status, detail)
TESTED = set()
async def call(name, args=None):
"""Invoke a tool through the MCP handler and return parsed JSON."""
TESTED.add(name)
try:
resp = await handle_call_tool(name, args or {})
return json.loads(resp[0].text)
except Exception as e: # noqa: BLE001
return {"_exception": repr(e)}
def summarize(data):
"""Extract a short (ok, detail) from a backend response."""
if not isinstance(data, dict):
return False, str(data)[:200]
if "_exception" in data:
return False, data["_exception"][:200]
if "error" in data:
return False, str(data["error"])[:200]
code = data.get("code")
msg = data.get("msg", "")
if code == 200:
return True, f"code=200 msg={msg}"
if code is not None:
return False, f"code={code} msg={msg}"
# list/paginated or already-unwrapped payloads
return True, json.dumps(data, ensure_ascii=False)[:160]
def record(name, data, mode="ok"):
"""Record a tool result.
mode="ok" -> PASS only when backend code==200
mode="roundtrip" -> PASS when the call round-trips (no exception and the
backend returned a structured response), regardless of
business code. Used for ops whose success depends on a
real reachable DB we can't guarantee in self-test.
"""
ok, detail = summarize(data)
if mode == "roundtrip":
exception = isinstance(data, dict) and "_exception" in data
passed = not exception
else:
passed = ok
status = "PASS" if passed else "FAIL"
RESULTS.append((name, status, detail))
print(f"[{status}] {name}: {detail}")
return data
def dig(data, *keys, default=None):
"""Safely walk nested dict keys."""
cur = data
for k in keys:
if not isinstance(cur, dict):
return default
cur = cur.get(k)
return cur if cur is not None else default
async def main():
PREFIX = f"selftest_{uuid.uuid4().hex[:8]}_"
state = {}
# ---- 1. read-only: connections & configs ----------------------------
print("\n=== 只读查询 ===")
conns = record("list_connections", await call("list_connections", {"pageSize": 5}))
# pick a builtin connection to exercise builtin/realtime tools
rows = conns.get("rows") or dig(conns, "data", "rows", default=[]) or []
builtin_id = None
any_conn_id = None
for r in rows:
cid = r.get("id")
if any_conn_id is None:
any_conn_id = cid
if r.get("sourceType") == "builtin" and builtin_id is None:
builtin_id = cid
state["builtin_id"] = builtin_id
state["any_conn_id"] = any_conn_id
print(f" -> builtin_id={builtin_id}, any_conn_id={any_conn_id}")
record("list_datasource_configs", await call("list_datasource_configs", {"pageSize": 5}))
if any_conn_id:
record("get_connection", await call("get_connection", {"id": any_conn_id}))
record("realtime_databases", await call("realtime_databases", {"id": any_conn_id}))
record("realtime_structure", await call("realtime_structure", {"id": any_conn_id}))
else:
for n in ("get_connection", "realtime_databases", "realtime_structure"):
RESULTS.append((n, "SKIP", "no connection available"))
# ---- 2. create a builtin PostgreSQL connection ----------------------
print("\n=== 创建内置连接 ===")
created = record(
"create_builtin_postgresql",
await call("create_builtin_postgresql", {
"datasourceName": PREFIX + "conn",
"remark": "self-test connection",
}),
)
conn_id = dig(created, "data", "id")
if conn_id is None and builtin_id:
conn_id = builtin_id # fall back to an existing builtin for downstream tools
state["conn_id"] = conn_id
print(f" -> conn_id={conn_id}")
if conn_id:
record("update_builtin_database", await call("update_builtin_database", {
"connectionId": conn_id,
"datasourceName": PREFIX + "conn_renamed",
"remark": "renamed by self-test",
}))
record("get_connection", await call("get_connection", {"id": conn_id}))
# ---- 3. external connection: test/create/update/status/delete -------
print("\n=== 外部连接测试(依赖真实可达的库,可能失败属正常)===")
# Try to spin up a local Docker Postgres mock. If Docker is unavailable or
# the backend cannot reach the host IP, fall back to 127.0.0.1 and keep the
# round-trip assertion (we still verify the tool round-trips).
mock_host = "127.0.0.1"
try:
mock_host = start_mock_db()
print(f" -> mock DB available at {mock_host}:5432")
except Exception as e: # noqa: BLE001
print(f" -> could not start mock DB, using 127.0.0.1:5432 ({e})")
# test_connection_config / test_connection rely on datasourceType default now
record(
"test_connection_config",
await call("test_connection_config", {
"host": mock_host, "port": 5432,
"username": "postgres", "password": "postgres",
}),
mode="roundtrip", # may still fail if backend cannot reach mock_host
)
# create_connection (external) — exercises datasourceType/connectionType defaults
ext = record(
"create_connection",
await call("create_connection", {
"datasourceName": PREFIX + "ext",
"host": mock_host, "port": 5432,
"username": "postgres", "password": "postgres",
"remark": "self-test external",
}),
mode="roundtrip",
)
ext_id = dig(ext, "data", "id")
state["ext_id"] = ext_id
print(f" -> ext_id={ext_id}")
if ext_id:
record("update_connection", await call("update_connection", {
"id": ext_id, "datasourceName": PREFIX + "ext_renamed",
"remark": "renamed",
}), mode="roundtrip")
record("test_connection", await call("test_connection", {
"id": ext_id, "host": mock_host, "port": 5432,
"username": "postgres", "password": "postgres",
}), mode="roundtrip")
record("change_connection_status", await call("change_connection_status", {
"id": ext_id, "status": 1,
}), mode="roundtrip")
else:
for n in ("update_connection", "test_connection", "change_connection_status"):
RESULTS.append((n, "SKIP", "no external connection id returned"))
# ---- 4. DDL on the builtin connection ------------------------------
print("\n=== DDL库/表 ===")
ddl_conn = conn_id or builtin_id
db_name = PREFIX + "db"
tbl_name = PREFIX + "users"
cols = [
{"columnName": "id", "columnType": "BIGINT", "isPrimaryKey": True,
"isNullable": False, "columnComment": "主键"},
{"columnName": "name", "columnType": "VARCHAR", "columnLength": 100,
"isNullable": False, "columnComment": "姓名"},
{"columnName": "age", "columnType": "INTEGER", "isNullable": True,
"columnComment": "年龄"},
]
if ddl_conn:
record("create_database", await call("create_database", {
"connectionId": ddl_conn, "databaseName": db_name,
"encoding": "UTF8",
}), mode="roundtrip")
record("create_table", await call("create_table", {
"connectionId": ddl_conn, "databaseName": db_name,
"tableName": tbl_name, "tableComment": "self-test 用户表",
"columns": cols,
}), mode="roundtrip")
record("realtime_tables", await call("realtime_tables", {
"id": ddl_conn, "databaseName": db_name,
}), mode="roundtrip")
record("create_database_table", await call("create_database_table", {
"connectionId": ddl_conn, "databaseName": PREFIX + "db2",
"encoding": "UTF8",
"tables": [{"tableName": PREFIX + "t2", "tableComment": "二号表",
"columns": cols}],
}), mode="roundtrip")
record("alter_table", await call("alter_table", {
"connectionId": ddl_conn, "databaseName": db_name,
"tableName": tbl_name,
"operations": [{"operation": "ADD_COLUMN", "column": {
"columnName": "email", "columnType": "VARCHAR",
"columnLength": 255, "isNullable": True, "columnComment": "邮箱"}}],
}), mode="roundtrip")
record("alter_database", await call("alter_database", {
"connectionId": ddl_conn, "databaseName": PREFIX + "db2",
"newName": PREFIX + "db2_renamed",
}), mode="roundtrip")
else:
for n in ("create_database", "create_table", "create_database_table",
"alter_table", "alter_database"):
RESULTS.append((n, "SKIP", "no builtin connection for DDL"))
# ---- 5. find the table id, exercise execute_sql + builtin CRUD ------
print("\n=== SQL 执行 + 内置表数据 CRUD ===")
# locate a datasource config id + table id for our table
cfgs = await call("list_datasource_configs", {"datasourceName": PREFIX, "pageSize": 20})
cfg_rows = cfgs.get("rows") or dig(cfgs, "data", "rows", default=[]) or []
datasource_id = cfg_rows[0].get("id") if cfg_rows else None
state["datasource_id"] = datasource_id
table_id = None
if ddl_conn:
detail = await call("get_connection", {"id": ddl_conn})
for ds in dig(detail, "data", "datasourceConfig", default=[]) or []:
for t in ds.get("tables", []) or []:
if str(t.get("tableName", "")).startswith(PREFIX):
table_id = t.get("tableId") or t.get("id")
break
if table_id:
break
state["table_id"] = table_id
print(f" -> datasource_id={datasource_id}, table_id={table_id}")
if datasource_id:
record("execute_sql", await call("execute_sql", {
"datasourceId": datasource_id,
"sql": "SELECT 1",
"databaseName": db_name,
}), mode="roundtrip")
else:
RESULTS.append(("execute_sql", "SKIP", "no datasource config id"))
if table_id:
record("builtin_table_insert", await call("builtin_table_insert", {
"tableId": table_id, "data": {"id": 1, "name": "张三", "age": 25},
}), mode="roundtrip")
record("builtin_table_data", await call("builtin_table_data", {
"tableId": table_id,
}), mode="roundtrip")
record("builtin_table_update", await call("builtin_table_update", {
"tableId": table_id, "data": {"name": "李四", "age": 30},
"primaryKey": {"id": 1},
}), mode="roundtrip")
record("builtin_table_delete", await call("builtin_table_delete", {
"tableId": table_id, "primaryKeys": [{"id": 1}],
}), mode="roundtrip")
else:
for n in ("builtin_table_insert", "builtin_table_data",
"builtin_table_update", "builtin_table_delete"):
RESULTS.append((n, "SKIP", "no table id found"))
# ---- 6. AI generate + document import (preview/confirm) -------------
print("\n=== AI 生成 + 文档导入 ===")
record("generate_table", await call("generate_table", {
"requirement": "一个简单的待办事项表,含标题、状态、创建时间",
"databaseId": datasource_id,
}), mode="roundtrip")
# build a tiny CSV for import preview
csv_path = os.path.join(tempfile.gettempdir(), "selftest_import.csv")
with open(csv_path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["name", "age", "gender"])
w.writerow(["张三", "25", "male"])
w.writerow(["李四", "30", "female"])
preview = None
if ddl_conn:
preview = record("import_document_preview", await call("import_document_preview", {
"connectionId": ddl_conn, "filePath": csv_path,
}), mode="roundtrip")
ts = dig(preview, "data", "tableStructure")
all_data = dig(preview, "data", "allData")
if ts and all_data:
record("import_document_confirm", await call("import_document_confirm", {
"connectionId": ddl_conn, "tableStructure": ts, "allData": all_data,
}), mode="roundtrip")
else:
RESULTS.append(("import_document_confirm", "SKIP",
"preview returned no tableStructure/allData"))
else:
for n in ("import_document_preview", "import_document_confirm"):
RESULTS.append((n, "SKIP", "no builtin connection"))
# ---- 7. datasource config batch ops --------------------------------
print("\n=== 数据源配置批量操作 ===")
if ddl_conn:
record("batch_create_datasource_configs", await call(
"batch_create_datasource_configs", {
"connectionId": ddl_conn,
"datasourceNamePrefix": PREFIX + "cfg",
"syncTables": False,
"databases": [{"databaseName": db_name, "tableNames": []}],
}), mode="roundtrip")
else:
RESULTS.append(("batch_create_datasource_configs", "SKIP", "no connection"))
if datasource_id:
record("get_datasource_config", await call("get_datasource_config", {
"id": datasource_id}), mode="roundtrip")
record("change_datasource_status", await call("change_datasource_status", {
"id": datasource_id, "status": 1}), mode="roundtrip")
record("batch_update_datasource_configs", await call(
"batch_update_datasource_configs", {
"syncTables": False,
"datasources": [{"id": datasource_id, "tableNames": []}],
}), mode="roundtrip")
else:
for n in ("get_datasource_config", "change_datasource_status",
"batch_update_datasource_configs"):
RESULTS.append((n, "SKIP", "no datasource config id"))
record("export_datasource_configs", await call("export_datasource_configs", {
"datasourceName": PREFIX}), mode="roundtrip")
# replace_datasource_configs intentionally skipped (destructive)
RESULTS.append(("replace_datasource_configs", "SKIP",
"destructive full-replace, skipped by design"))
# ---- 8. cleanup -----------------------------------------------------
print("\n=== 清理 selftest_ 资源 ===")
# delete selftest datasource configs
cfgs2 = await call("list_datasource_configs", {"datasourceName": PREFIX, "pageSize": 50})
cfg_rows2 = cfgs2.get("rows") or dig(cfgs2, "data", "rows", default=[]) or []
del_ids = [r.get("id") for r in cfg_rows2 if r.get("id") is not None]
if del_ids:
record("delete_datasource_configs", await call("delete_datasource_configs", {
"ids": del_ids}), mode="roundtrip")
else:
RESULTS.append(("delete_datasource_configs", "SKIP", "nothing to delete"))
# delete selftest connections
deleted_conn = False
for cid in (state.get("ext_id"), conn_id):
if cid:
record("delete_connection", await call("delete_connection", {"id": cid}),
mode="roundtrip")
deleted_conn = True
if not deleted_conn:
RESULTS.append(("delete_connection", "SKIP", "no selftest connection to delete"))
try:
os.remove(csv_path)
except OSError:
pass
try:
stop_mock_db()
except Exception as e: # noqa: BLE001
print(f" -> failed to stop mock DB: {e}")
# ---- summary --------------------------------------------------------
print("\n" + "=" * 60)
print("自测结果汇总")
print("=" * 60)
all_names = {t["name"] for t in ALL_TOOLS}
counts = {"PASS": 0, "FAIL": 0, "SKIP": 0}
for name, status, detail in RESULTS:
counts[status] = counts.get(status, 0) + 1
for name, status, detail in RESULTS:
print(f" [{status}] {name}: {detail}")
untested = all_names - TESTED
print("-" * 60)
print(f"工具总数: {len(all_names)} 覆盖: {len(TESTED)} 未触达: {sorted(untested)}")
print(f"PASS={counts['PASS']} FAIL={counts['FAIL']} SKIP={counts['SKIP']}")
if __name__ == "__main__":
asyncio.run(main())