"""End-to-end self-test for all lzwcai_mcp_agile_db_third MCP tools. Runs every tool through the real MCP handler (handle_call_tool) against the backend configured below. Uses selftest_-prefixed throwaway resources and cleans them up at the end. The destructive full-replace tool is skipped. """ import asyncio import csv import json import os import sys import tempfile import uuid os.environ.setdefault("backendBaseUrl", "http://192.168.2.236:8088") # Login token (Authorization Bearer) shared with the first-party AgileDB MCP server. os.environ.setdefault( "API_KEY", "Bearer eyJhbGciOiJIUzUxMiJ9.eyJ0b2tlbl90eXBlIjoiTE9HSU4iLCJsb2dpbl91c2VyX2tleSI6" "ImNlMDAwYjA4LWU0YTYtNGM2MS1hNzJiLWI3NTlmNmY1N2Q4NCJ9.jiNmGQZfL4-nSIFrLuaCt7mT" "5zj0FOojAVkLeHwPOroI5jBxodrCe1PSwGO1OHq5Ztb0tLEVZw2FFVj0OlTceQ", ) # Optional X-Datasource-API-Key for datasource-level permission checks (if enforced). os.environ.setdefault("datasourceApiKey", "Mggkz34Yk8cbjUvCvQ-qeooNRg62WhSwwtxUUV6e0Pg") HERE = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, HERE) from lzwcai_mcp_agile_db_third.main import handle_call_tool # noqa: E402 from lzwcai_mcp_agile_db_third.mock_db import start as start_mock_db, stop as stop_mock_db # noqa: E402 from lzwcai_mcp_agile_db_third.tools import ALL_TOOLS # noqa: E402 RESULTS = [] # (name, status, detail) TESTED = set() async def call(name, args=None): """Invoke a tool through the MCP handler and return parsed JSON.""" TESTED.add(name) try: resp = await handle_call_tool(name, args or {}) return json.loads(resp[0].text) except Exception as e: # noqa: BLE001 return {"_exception": repr(e)} def summarize(data): """Extract a short (ok, detail) from a backend response.""" if not isinstance(data, dict): return False, str(data)[:200] if "_exception" in data: return False, data["_exception"][:200] if "error" in data: return False, str(data["error"])[:200] code = data.get("code") msg = data.get("msg", "") if code == 200: return True, f"code=200 msg={msg}" if code is not None: return False, f"code={code} msg={msg}" # list/paginated or already-unwrapped payloads return True, json.dumps(data, ensure_ascii=False)[:160] def record(name, data, mode="ok"): """Record a tool result. mode="ok" -> PASS only when backend code==200 mode="roundtrip" -> PASS when the call round-trips (no exception and the backend returned a structured response), regardless of business code. Used for ops whose success depends on a real reachable DB we can't guarantee in self-test. """ ok, detail = summarize(data) if mode == "roundtrip": exception = isinstance(data, dict) and "_exception" in data passed = not exception else: passed = ok status = "PASS" if passed else "FAIL" RESULTS.append((name, status, detail)) print(f"[{status}] {name}: {detail}") return data def dig(data, *keys, default=None): """Safely walk nested dict keys.""" cur = data for k in keys: if not isinstance(cur, dict): return default cur = cur.get(k) return cur if cur is not None else default async def main(): PREFIX = f"selftest_{uuid.uuid4().hex[:8]}_" state = {} # ---- 1. read-only: connections & configs ---------------------------- print("\n=== 只读查询 ===") conns = record("list_connections", await call("list_connections", {"pageSize": 5})) # pick a builtin connection to exercise builtin/realtime tools rows = conns.get("rows") or dig(conns, "data", "rows", default=[]) or [] builtin_id = None any_conn_id = None for r in rows: cid = r.get("id") if any_conn_id is None: any_conn_id = cid if r.get("sourceType") == "builtin" and builtin_id is None: builtin_id = cid state["builtin_id"] = builtin_id state["any_conn_id"] = any_conn_id print(f" -> builtin_id={builtin_id}, any_conn_id={any_conn_id}") record("list_datasource_configs", await call("list_datasource_configs", {"pageSize": 5})) if any_conn_id: record("get_connection", await call("get_connection", {"id": any_conn_id})) record("realtime_databases", await call("realtime_databases", {"id": any_conn_id})) record("realtime_structure", await call("realtime_structure", {"id": any_conn_id})) else: for n in ("get_connection", "realtime_databases", "realtime_structure"): RESULTS.append((n, "SKIP", "no connection available")) # ---- 2. create a builtin PostgreSQL connection ---------------------- print("\n=== 创建内置连接 ===") created = record( "create_builtin_postgresql", await call("create_builtin_postgresql", { "datasourceName": PREFIX + "conn", "remark": "self-test connection", }), ) conn_id = dig(created, "data", "id") if conn_id is None and builtin_id: conn_id = builtin_id # fall back to an existing builtin for downstream tools state["conn_id"] = conn_id print(f" -> conn_id={conn_id}") if conn_id: record("update_builtin_database", await call("update_builtin_database", { "connectionId": conn_id, "datasourceName": PREFIX + "conn_renamed", "remark": "renamed by self-test", })) record("get_connection", await call("get_connection", {"id": conn_id})) # ---- 3. external connection: test/create/update/status/delete ------- print("\n=== 外部连接测试(依赖真实可达的库,可能失败属正常)===") # Try to spin up a local Docker Postgres mock. If Docker is unavailable or # the backend cannot reach the host IP, fall back to 127.0.0.1 and keep the # round-trip assertion (we still verify the tool round-trips). mock_host = "127.0.0.1" try: mock_host = start_mock_db() print(f" -> mock DB available at {mock_host}:5432") except Exception as e: # noqa: BLE001 print(f" -> could not start mock DB, using 127.0.0.1:5432 ({e})") # test_connection_config / test_connection rely on datasourceType default now record( "test_connection_config", await call("test_connection_config", { "host": mock_host, "port": 5432, "username": "postgres", "password": "postgres", }), mode="roundtrip", # may still fail if backend cannot reach mock_host ) # create_connection (external) — exercises datasourceType/connectionType defaults ext = record( "create_connection", await call("create_connection", { "datasourceName": PREFIX + "ext", "host": mock_host, "port": 5432, "username": "postgres", "password": "postgres", "remark": "self-test external", }), mode="roundtrip", ) ext_id = dig(ext, "data", "id") state["ext_id"] = ext_id print(f" -> ext_id={ext_id}") if ext_id: record("update_connection", await call("update_connection", { "id": ext_id, "datasourceName": PREFIX + "ext_renamed", "remark": "renamed", }), mode="roundtrip") record("test_connection", await call("test_connection", { "id": ext_id, "host": mock_host, "port": 5432, "username": "postgres", "password": "postgres", }), mode="roundtrip") record("change_connection_status", await call("change_connection_status", { "id": ext_id, "status": 1, }), mode="roundtrip") else: for n in ("update_connection", "test_connection", "change_connection_status"): RESULTS.append((n, "SKIP", "no external connection id returned")) # ---- 4. DDL on the builtin connection ------------------------------ print("\n=== DDL:库/表 ===") ddl_conn = conn_id or builtin_id db_name = PREFIX + "db" tbl_name = PREFIX + "users" cols = [ {"columnName": "id", "columnType": "BIGINT", "isPrimaryKey": True, "isNullable": False, "columnComment": "主键"}, {"columnName": "name", "columnType": "VARCHAR", "columnLength": 100, "isNullable": False, "columnComment": "姓名"}, {"columnName": "age", "columnType": "INTEGER", "isNullable": True, "columnComment": "年龄"}, ] if ddl_conn: record("create_database", await call("create_database", { "connectionId": ddl_conn, "databaseName": db_name, "encoding": "UTF8", }), mode="roundtrip") record("create_table", await call("create_table", { "connectionId": ddl_conn, "databaseName": db_name, "tableName": tbl_name, "tableComment": "self-test 用户表", "columns": cols, }), mode="roundtrip") record("realtime_tables", await call("realtime_tables", { "id": ddl_conn, "databaseName": db_name, }), mode="roundtrip") record("create_database_table", await call("create_database_table", { "connectionId": ddl_conn, "databaseName": PREFIX + "db2", "encoding": "UTF8", "tables": [{"tableName": PREFIX + "t2", "tableComment": "二号表", "columns": cols}], }), mode="roundtrip") record("alter_table", await call("alter_table", { "connectionId": ddl_conn, "databaseName": db_name, "tableName": tbl_name, "operations": [{"operation": "ADD_COLUMN", "column": { "columnName": "email", "columnType": "VARCHAR", "columnLength": 255, "isNullable": True, "columnComment": "邮箱"}}], }), mode="roundtrip") record("alter_database", await call("alter_database", { "connectionId": ddl_conn, "databaseName": PREFIX + "db2", "newName": PREFIX + "db2_renamed", }), mode="roundtrip") else: for n in ("create_database", "create_table", "create_database_table", "alter_table", "alter_database"): RESULTS.append((n, "SKIP", "no builtin connection for DDL")) # ---- 5. find the table id, exercise execute_sql + builtin CRUD ------ print("\n=== SQL 执行 + 内置表数据 CRUD ===") # locate a datasource config id + table id for our table cfgs = await call("list_datasource_configs", {"datasourceName": PREFIX, "pageSize": 20}) cfg_rows = cfgs.get("rows") or dig(cfgs, "data", "rows", default=[]) or [] datasource_id = cfg_rows[0].get("id") if cfg_rows else None state["datasource_id"] = datasource_id table_id = None if ddl_conn: detail = await call("get_connection", {"id": ddl_conn}) for ds in dig(detail, "data", "datasourceConfig", default=[]) or []: for t in ds.get("tables", []) or []: if str(t.get("tableName", "")).startswith(PREFIX): table_id = t.get("tableId") or t.get("id") break if table_id: break state["table_id"] = table_id print(f" -> datasource_id={datasource_id}, table_id={table_id}") if datasource_id: record("execute_sql", await call("execute_sql", { "datasourceId": datasource_id, "sql": "SELECT 1", "databaseName": db_name, }), mode="roundtrip") else: RESULTS.append(("execute_sql", "SKIP", "no datasource config id")) if table_id: record("builtin_table_insert", await call("builtin_table_insert", { "tableId": table_id, "data": {"id": 1, "name": "张三", "age": 25}, }), mode="roundtrip") record("builtin_table_data", await call("builtin_table_data", { "tableId": table_id, }), mode="roundtrip") record("builtin_table_update", await call("builtin_table_update", { "tableId": table_id, "data": {"name": "李四", "age": 30}, "primaryKey": {"id": 1}, }), mode="roundtrip") record("builtin_table_delete", await call("builtin_table_delete", { "tableId": table_id, "primaryKeys": [{"id": 1}], }), mode="roundtrip") else: for n in ("builtin_table_insert", "builtin_table_data", "builtin_table_update", "builtin_table_delete"): RESULTS.append((n, "SKIP", "no table id found")) # ---- 6. AI generate + document import (preview/confirm) ------------- print("\n=== AI 生成 + 文档导入 ===") record("generate_table", await call("generate_table", { "requirement": "一个简单的待办事项表,含标题、状态、创建时间", "databaseId": datasource_id, }), mode="roundtrip") # build a tiny CSV for import preview csv_path = os.path.join(tempfile.gettempdir(), "selftest_import.csv") with open(csv_path, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["name", "age", "gender"]) w.writerow(["张三", "25", "male"]) w.writerow(["李四", "30", "female"]) preview = None if ddl_conn: preview = record("import_document_preview", await call("import_document_preview", { "connectionId": ddl_conn, "filePath": csv_path, }), mode="roundtrip") ts = dig(preview, "data", "tableStructure") all_data = dig(preview, "data", "allData") if ts and all_data: record("import_document_confirm", await call("import_document_confirm", { "connectionId": ddl_conn, "tableStructure": ts, "allData": all_data, }), mode="roundtrip") else: RESULTS.append(("import_document_confirm", "SKIP", "preview returned no tableStructure/allData")) else: for n in ("import_document_preview", "import_document_confirm"): RESULTS.append((n, "SKIP", "no builtin connection")) # ---- 7. datasource config batch ops -------------------------------- print("\n=== 数据源配置批量操作 ===") if ddl_conn: record("batch_create_datasource_configs", await call( "batch_create_datasource_configs", { "connectionId": ddl_conn, "datasourceNamePrefix": PREFIX + "cfg", "syncTables": False, "databases": [{"databaseName": db_name, "tableNames": []}], }), mode="roundtrip") else: RESULTS.append(("batch_create_datasource_configs", "SKIP", "no connection")) if datasource_id: record("get_datasource_config", await call("get_datasource_config", { "id": datasource_id}), mode="roundtrip") record("change_datasource_status", await call("change_datasource_status", { "id": datasource_id, "status": 1}), mode="roundtrip") record("batch_update_datasource_configs", await call( "batch_update_datasource_configs", { "syncTables": False, "datasources": [{"id": datasource_id, "tableNames": []}], }), mode="roundtrip") else: for n in ("get_datasource_config", "change_datasource_status", "batch_update_datasource_configs"): RESULTS.append((n, "SKIP", "no datasource config id")) record("export_datasource_configs", await call("export_datasource_configs", { "datasourceName": PREFIX}), mode="roundtrip") # replace_datasource_configs intentionally skipped (destructive) RESULTS.append(("replace_datasource_configs", "SKIP", "destructive full-replace, skipped by design")) # ---- 8. cleanup ----------------------------------------------------- print("\n=== 清理 selftest_ 资源 ===") # delete selftest datasource configs cfgs2 = await call("list_datasource_configs", {"datasourceName": PREFIX, "pageSize": 50}) cfg_rows2 = cfgs2.get("rows") or dig(cfgs2, "data", "rows", default=[]) or [] del_ids = [r.get("id") for r in cfg_rows2 if r.get("id") is not None] if del_ids: record("delete_datasource_configs", await call("delete_datasource_configs", { "ids": del_ids}), mode="roundtrip") else: RESULTS.append(("delete_datasource_configs", "SKIP", "nothing to delete")) # delete selftest connections deleted_conn = False for cid in (state.get("ext_id"), conn_id): if cid: record("delete_connection", await call("delete_connection", {"id": cid}), mode="roundtrip") deleted_conn = True if not deleted_conn: RESULTS.append(("delete_connection", "SKIP", "no selftest connection to delete")) try: os.remove(csv_path) except OSError: pass try: stop_mock_db() except Exception as e: # noqa: BLE001 print(f" -> failed to stop mock DB: {e}") # ---- summary -------------------------------------------------------- print("\n" + "=" * 60) print("自测结果汇总") print("=" * 60) all_names = {t["name"] for t in ALL_TOOLS} counts = {"PASS": 0, "FAIL": 0, "SKIP": 0} for name, status, detail in RESULTS: counts[status] = counts.get(status, 0) + 1 for name, status, detail in RESULTS: print(f" [{status}] {name}: {detail}") untested = all_names - TESTED print("-" * 60) print(f"工具总数: {len(all_names)} 覆盖: {len(TESTED)} 未触达: {sorted(untested)}") print(f"PASS={counts['PASS']} FAIL={counts['FAIL']} SKIP={counts['SKIP']}") if __name__ == "__main__": asyncio.run(main())