feat(k3cloud): 新增金蝶云星空 MCP 服务器

新增完整的 K3Cloud MCP 服务器实现,包含配置管理、API 客户端、工具定义和签名验证
更新文件工具 MCP 服务器的 MinIO 配置和上传路径前缀
This commit is contained in:
2026-05-06 16:29:34 +08:00
parent 49bdf45bfa
commit ede63f6e92
13 changed files with 491 additions and 5 deletions

View File

@@ -0,0 +1,50 @@
import json
from typing import Any, Dict
import requests
from .config import Config
from .signing import build_headers
def build_save_url(path: str) -> str:
"""Build the Save API URL."""
return f"{Config.BASE_URL.rstrip('/')}{path}"
def save(formid: str, data_payload: Any, timeout: int = 30) -> Dict[str, Any]:
"""
Call the K3Cloud Save API.
`data_payload` can be a JSON object or a pre-serialized JSON string.
"""
save_service_path = "/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.Save.common.kdsvc"
normalized_formid = str(formid or "").strip()
if not normalized_formid:
raise ValueError("formid 不能为空")
if data_payload is None:
raise ValueError("data_payload 不能为空")
if isinstance(data_payload, str):
payload_value = data_payload.strip()
if not payload_value:
raise ValueError("data_payload 不能为空字符串")
else:
try:
payload_value = json.dumps(data_payload, ensure_ascii=False)
except TypeError as exc:
raise TypeError("data_payload 必须是可序列化的 JSON 对象或 JSON 字符串") from exc
payload = {
"formid": normalized_formid,
"data": payload_value,
}
response = requests.post(
build_save_url(save_service_path),
json=payload,
headers=build_headers(save_service_path),
timeout=timeout,
)
response.raise_for_status()
return response.json()