- 在 client.py 中新增 view() 函数,支持调用金蝶云星空 DynamicFormService.View 接口 - 在 tools.py 中新增 view_form 工具,供 MCP 客户端调用 - 添加演示脚本 demo_view.py 和示例配置文件 demo_view.json - 重构 demo_save.py 为通用演示脚本,支持自定义 payload 文件 - 删除过时的 demo.json,新增 userful_save.json 作为实用示例 - 添加项目交接文档,详细说明各子项目架构和配置 - 优化 payload 序列化逻辑,提取为共享函数 _serialize_payload
73 lines
2.0 KiB
Python
73 lines
2.0 KiB
Python
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
CURRENT_DIR = Path(__file__).resolve().parent
|
|
PROJECT_ROOT = CURRENT_DIR.parent
|
|
DEFAULT_PAYLOAD_PATH = CURRENT_DIR / "demo_save.json"
|
|
MCP_SERVER_PATH = PROJECT_ROOT / "mcp-server.json"
|
|
|
|
|
|
def _ensure_project_root_on_path() -> None:
|
|
root = str(PROJECT_ROOT)
|
|
if root not in sys.path:
|
|
sys.path.insert(0, root)
|
|
|
|
|
|
def _load_payload(payload_path: Path) -> Any:
|
|
raw_text = payload_path.read_text(encoding="utf-8").strip()
|
|
if not raw_text:
|
|
raise ValueError(f"payload 文件为空: {payload_path}")
|
|
return json.loads(raw_text)
|
|
|
|
|
|
def _bootstrap_env_from_mcp_server() -> None:
|
|
if not MCP_SERVER_PATH.exists():
|
|
return
|
|
|
|
data = json.loads(MCP_SERVER_PATH.read_text(encoding="utf-8"))
|
|
env_map = data.get("mcpServers", {}).get("k3cloud-mcp", {}).get("env", {})
|
|
for key, value in env_map.items():
|
|
if key not in os.environ and value is not None:
|
|
os.environ[key] = str(value)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="读取同级 demo.json 并直接调用 k3cloud_mcp.client.save()"
|
|
)
|
|
parser.add_argument("formid", help="K3Cloud 表单标识,例如 BD_Customer")
|
|
parser.add_argument(
|
|
"--payload",
|
|
default=str(DEFAULT_PAYLOAD_PATH),
|
|
help="payload JSON 文件路径,默认读取同级 demo.json",
|
|
)
|
|
parser.add_argument(
|
|
"--timeout",
|
|
type=int,
|
|
default=30,
|
|
help="请求超时时间,单位秒,默认 30",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
payload_path = Path(args.payload).expanduser().resolve()
|
|
_ensure_project_root_on_path()
|
|
_bootstrap_env_from_mcp_server()
|
|
|
|
from k3cloud_mcp.client import save
|
|
|
|
result = save(
|
|
formid=args.formid,
|
|
data_payload=_load_payload(payload_path),
|
|
timeout=args.timeout,
|
|
)
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|