Compare commits

...

10 Commits

Author SHA1 Message Date
ede63f6e92 feat(k3cloud): 新增金蝶云星空 MCP 服务器
新增完整的 K3Cloud MCP 服务器实现,包含配置管理、API 客户端、工具定义和签名验证
更新文件工具 MCP 服务器的 MinIO 配置和上传路径前缀
2026-05-06 16:29:34 +08:00
49bdf45bfa feat(tools): 支持向多个接收者发送卡片消息
重构 send_card_message 为 send_notion_card,支持接收者 ID 列表
更新 .gitignore 以忽略更多临时文件
升级项目版本至 0.1.17
删除未使用的 card.txt 模板文件
2026-04-09 11:25:34 +08:00
卢永锵
e4909f159d Merge pull request 'feishuokr' (#2) from feishuokr into master
Reviewed-on: http://git.lzwcai.com:3000/tanjianbin/lzwcai-mcp/pulls/2
2026-04-03 09:44:23 +00:00
357541a776 chore: 更新项目版本号至0.1.16 2026-04-03 17:40:17 +08:00
7d4400bb23 feat(tools): 添加获取用户OKR列表和批量查询OKR详情的工具
新增两个与飞书OKR API集成的工具函数:
- get_user_okr_list: 根据用户ID分页查询其OKR列表,支持按周期筛选和语言选择
- batch_get_okr: 根据OKR ID批量获取详细的目标及关键结果信息,并格式化输出进度、状态、截止日期等关键信息

同时在handle_call_tool中添加对应的调用处理逻辑。
2026-04-03 16:52:47 +08:00
3b881bf8c8 feat(temi_mcp): 添加跳舞工具并修复卡片消息发送
为轮足机器人添加跳舞功能,包含新的 MCP 工具和服务器方法。同时修复 Lark MCP 中发送卡片消息时 person_id 处理的问题,避免在 person_id 为 None 时产生异常。更新两个包的版本号以反映变更。
2026-04-03 15:55:23 +08:00
894ee1dfbf feat(lark_mcp): 更新飞书卡片消息发送工具以支持单接收者
- 将send_card_message工具从接收ID列表改为单个接收者ID
- 修复test.py中导入并调用send_card_message的测试逻辑
- 新增机器人充电、终止任务、通知播放功能
- 扩展迎宾和巡逻功能参数,提供默认值和更灵活的配置
- 更新项目版本号至0.1.13和0.1.9
2026-03-19 10:32:09 +08:00
bin
3d04166314 feat: 新增轮足机器人导航MCP服务器
添加 lzwcai_temi_mcp 包,实现基于MQTT的轮足机器人导航控制服务器。包含以下功能:
- 通过 MQTT 发布机器人控制命令
- 提供 goto、speak、reception、repose、patrol 等导航工具
- 支持通过 MCP 协议与AI助手集成
- 更新 lzwcai_lark_mcp 版本至 0.1.11
2026-03-15 18:36:34 +08:00
bin
7daa8e46c2 build: 更新项目版本至0.1.10 2026-03-09 15:53:59 +08:00
bin
10fbb58b70 test: 更新测试脚本以独立测试飞书卡片发送功能
- 将 test.py 改为同步请求方式,移除对 mcp 模块的依赖
- 新增 test_send_card.py 用于测试卡片模板的加载与渲染
- 更新资产确认卡片的参数格式,支持更灵活的资产列表结构
- 改进环境变量读取逻辑,兼容新旧配置键名
2026-03-04 10:35:02 +08:00
27 changed files with 2588 additions and 257 deletions

4
.gitignore vendored
View File

@@ -1 +1,5 @@
__pycache__/
*.txt
.DS*
test*

View File

@@ -387,7 +387,7 @@ def _upload_file_to_minio_sync(file_path: str) -> str:
secure = endpoint_secure or False
client = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=secure)
bucket = "lzwcai"
prefix = "tmp"
prefix = "upload"
if not client.bucket_exists(bucket):
client.make_bucket(bucket)
date_str = datetime.now().strftime("%Y-%m-%d")

View File

@@ -9,9 +9,9 @@
"lzwcai-file-tools-mcp"
],
"env": {
"minio_endpoint": "http://192.168.11.24:9000",
"minio_access_key": "cXk8WPR3ix86J9aGK6tH",
"minio_secret_key": "FSH8g3tx8bTR4w8BZmwl35WvWbOXZvfvCcivRRJE"
"minio_endpoint": "http://hyy-minio.awin25.com:1800",
"minio_access_key": "orOXTOpVfRtYzovP",
"minio_secret_key": "4EOMjjbrji1DHW0EBSlYA7JqBnJUy0aj"
}
}
}

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "lzwcai-file-tools-mcp"
version = "0.1.0"
version = "0.1.1"
description = "File tools MCP server"
requires-python = ">=3.10"
dependencies = [

View File

@@ -0,0 +1,3 @@
"""K3Cloud MCP package."""
__all__ = ["main"]

View File

@@ -0,0 +1,50 @@
import json
from typing import Any, Dict
import requests
from .config import Config
from .signing import build_headers
def build_save_url(path: str) -> str:
"""Build the Save API URL."""
return f"{Config.BASE_URL.rstrip('/')}{path}"
def save(formid: str, data_payload: Any, timeout: int = 30) -> Dict[str, Any]:
"""
Call the K3Cloud Save API.
`data_payload` can be a JSON object or a pre-serialized JSON string.
"""
save_service_path = "/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.Save.common.kdsvc"
normalized_formid = str(formid or "").strip()
if not normalized_formid:
raise ValueError("formid 不能为空")
if data_payload is None:
raise ValueError("data_payload 不能为空")
if isinstance(data_payload, str):
payload_value = data_payload.strip()
if not payload_value:
raise ValueError("data_payload 不能为空字符串")
else:
try:
payload_value = json.dumps(data_payload, ensure_ascii=False)
except TypeError as exc:
raise TypeError("data_payload 必须是可序列化的 JSON 对象或 JSON 字符串") from exc
payload = {
"formid": normalized_formid,
"data": payload_value,
}
response = requests.post(
build_save_url(save_service_path),
json=payload,
headers=build_headers(save_service_path),
timeout=timeout,
)
response.raise_for_status()
return response.json()

View File

@@ -0,0 +1,18 @@
import os
def _require_env(name: str) -> str:
value = os.getenv(name, "").strip()
if not value:
raise ValueError(f"missing required environment variable: {name}")
return value
class Config:
BASE_URL = _require_env("K3CLOUD_BASE_URL")
ACCT_ID = _require_env("K3CLOUD_ACCT_ID")
APP_ID = _require_env("K3CLOUD_APP_ID")
USERNAME = _require_env("K3CLOUD_USERNAME")
APP_SECRET = _require_env("K3CLOUD_APP_SECRET")
LCID = int(os.getenv("K3CLOUD_LCID", "2052"))
ORG_NUM = int(os.getenv("K3CLOUD_ORG_NUM", "0"))

View File

@@ -0,0 +1,78 @@
{
"NeedUpDateFields": [],
"NeedReturnFields": [],
"IsDeleteEntry": "true",
"SubSystemId": "",
"IsVerifyBaseDataField": "false",
"IsEntryBatchFill": "true",
"ValidateFlag": "true",
"NumberSearch": "true",
"IsAutoAdjustField": "false",
"InterationFlags": "",
"IgnoreInterationFlag": "",
"IsControlPrecision": "false",
"ValidateRepeatJson": "false",
"Model": {
"FID": 0,
"FBillTypeID": {
"FNUMBER": ""
},
"FBillNo": "1",
"FPAYORGID": {
"FNumber": ""
},
"FDATE": "1900-01-01",
"FCONTACTUNITTYPE": "",
"FCONTACTUNIT": {
"FNumber": ""
},
"FPAYUNITTYPE": "",
"FPAYUNIT": {
"FNumber": ""
},
"FCURRENCYID": {
"FNumber": ""
},
"FSETTLECUR": {
"FNUMBER": ""
},
"FDOCUMENTSTATUS": "",
"FBUSINESSTYPE": "",
"FCancelStatus": "",
"FSETTLEMAINBOOKID": {
"FNUMBER": ""
},
"FRECEIVEBILLENTRY": [
{
"FEntryID": 0,
"FSETTLETYPEID": {
"FNumber": ""
},
"FPURPOSEID": {
"FNumber": "SFKYT01_SYS"
},
"FPOSTDATE": "1900-01-01",
"FASSSALESORDER": [
{
"FDetailID": 0
}
]
}
],
"FRECEIVEBILLSRCENTRY": [
{
"FEntryID": 0
}
],
"FBILLRECEIVABLEENTRY": [
{
"FEntryID": 0
}
],
"FBILLSKDRECENTRY": [
{
"FEntryID": 0
}
]
}
}

View File

@@ -0,0 +1,72 @@
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Any
CURRENT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = CURRENT_DIR.parent
DEFAULT_PAYLOAD_PATH = CURRENT_DIR / "demo.json"
MCP_SERVER_PATH = PROJECT_ROOT / "mcp-server.json"
def _ensure_project_root_on_path() -> None:
root = str(PROJECT_ROOT)
if root not in sys.path:
sys.path.insert(0, root)
def _load_payload(payload_path: Path) -> Any:
raw_text = payload_path.read_text(encoding="utf-8").strip()
if not raw_text:
raise ValueError(f"payload 文件为空: {payload_path}")
return json.loads(raw_text)
def _bootstrap_env_from_mcp_server() -> None:
if not MCP_SERVER_PATH.exists():
return
data = json.loads(MCP_SERVER_PATH.read_text(encoding="utf-8"))
env_map = data.get("mcpServers", {}).get("k3cloud-mcp", {}).get("env", {})
for key, value in env_map.items():
if key not in os.environ and value is not None:
os.environ[key] = str(value)
def main() -> None:
parser = argparse.ArgumentParser(
description="读取同级 demo.json 并直接调用 k3cloud_mcp.client.save()"
)
parser.add_argument("formid", help="K3Cloud 表单标识,例如 BD_Customer")
parser.add_argument(
"--payload",
default=str(DEFAULT_PAYLOAD_PATH),
help="payload JSON 文件路径,默认读取同级 demo.json",
)
parser.add_argument(
"--timeout",
type=int,
default=30,
help="请求超时时间,单位秒,默认 30",
)
args = parser.parse_args()
payload_path = Path(args.payload).expanduser().resolve()
_ensure_project_root_on_path()
_bootstrap_env_from_mcp_server()
from k3cloud_mcp.client import save
result = save(
formid=args.formid,
data_payload=_load_payload(payload_path),
timeout=args.timeout,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,46 @@
import asyncio
import logging
import os
from typing import Sequence
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from .tools import handle_call_tool, tools
logging.basicConfig(
level=getattr(logging, os.getenv("LOG_LEVEL", "INFO"), logging.INFO),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
async def serve() -> None:
server = Server("k3cloud_mcp")
@server.list_tools()
async def list_tools() -> list[Tool]:
return tools
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
return await handle_call_tool(name, arguments)
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
def main() -> None:
try:
asyncio.run(serve())
except KeyboardInterrupt:
logger.info("Server interrupted by user")
except Exception as exc:
logger.error("Server runtime error: %s", exc)
raise
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,103 @@
import base64
import hashlib
import hmac
import random
import time
from typing import Dict
from urllib.parse import quote
from .config import Config
def _sdk_base64_encode(data_bytes: bytes) -> str:
return base64.b64encode(data_bytes).decode("utf-8")
def _sdk_base64_decode(data_str: str) -> bytes:
return base64.b64decode(data_str)
def _sdk_hmac_sha256(content: str, sign_key: str) -> str:
signature = hmac.new(
sign_key.encode("utf-8"),
content.encode("utf-8"),
hashlib.sha256,
).digest()
sign_hex = signature.hex()
return _sdk_base64_encode(sign_hex.encode("utf-8"))
def _rot(value: str) -> str:
def encode_char(ch: str) -> str:
if ch.islower():
return chr((ord(ch) - 97 + 13) % 26 + 97)
if ch.isupper():
return chr((ord(ch) - 65 + 13) % 26 + 65)
return ch
return "".join(encode_char(char) for char in value)
def _generate_code() -> str:
rand = str(random.randint(1000, 9999))
return f"0054s397{rand[0]}p6234378{rand[1]}o09pn7q3{rand[2]}r5qropr7{rand[3]}"
def _extend_byte_array(origin: str, extend_type: int = 0) -> bytearray:
if extend_type == 0:
return bytearray(_rot(origin), encoding="utf-8")
gene_str = "".join(origin[index * 9 : index * 9 + 8] for index in range(4))
return bytearray(_rot(gene_str), encoding="utf-8")
def _xor_code(byte_array: bytearray) -> bytearray:
pwd_array = _extend_byte_array(_generate_code(), extend_type=1)
return bytearray(byte ^ pwd_array[index] for index, byte in enumerate(byte_array))
def _decode_app_secret(encoded_secret: str) -> str:
if len(encoded_secret) != 32:
return ""
base64_decode = _sdk_base64_decode(encoded_secret)
base64_xor = _xor_code(bytearray(base64_decode))
return _sdk_base64_encode(base64_xor)
def build_headers(service_path: str) -> Dict[str, str]:
"""Build K3Cloud request headers for the target service path."""
parts = Config.APP_ID.split("_")
if len(parts) == 2:
client_id = parts[0]
client_sec = _decode_app_secret(parts[1])
else:
client_id = Config.APP_ID
client_sec = ""
timestamp = str(int(time.time()))
nonce = str(int(time.time()))
path_encoded = quote(service_path, encoding="utf-8").replace("/", "%2F")
api_sign_str = f"POST\n{path_encoded}\n\nx-api-nonce:{nonce}\nx-api-timestamp:{timestamp}\n"
api_signature = _sdk_hmac_sha256(api_sign_str, client_sec)
app_data_str = (
f"{Config.ACCT_ID},{Config.USERNAME},{Config.LCID},{Config.ORG_NUM}"
)
app_data_b64 = _sdk_base64_encode(app_data_str.encode("utf-8"))
kd_signature = _sdk_hmac_sha256(
Config.APP_ID + app_data_str,
Config.APP_SECRET,
)
return {
"Content-Type": "application/json",
"X-Api-ClientID": client_id,
"X-Api-Auth-Version": "2.0",
"X-Api-Timestamp": timestamp,
"X-Api-Nonce": nonce,
"X-Api-SignHeaders": "x-api-timestamp,x-api-nonce",
"X-Api-Signature": api_signature,
"X-KD-AppKey": Config.APP_ID,
"X-KD-AppData": app_data_b64,
"X-KD-Signature": kd_signature,
}

View File

@@ -0,0 +1,75 @@
import json
from typing import Any, Dict, List
from mcp.types import TextContent, Tool
from .client import save
tools = [
Tool(
name="save_form",
description="调用金蝶云星空 DynamicFormService.Save 接口保存单据。",
inputSchema={
"type": "object",
"properties": {
"formid": {
"type": "string",
"description": "业务对象表单 Id例如 SAL_QUOTATION",
},
"data_payload": {
"description": "Save 接口的 data 参数,支持 JSON 对象或 JSON 字符串",
},
"timeout": {
"type": "integer",
"description": "HTTP 超时时间(秒),默认 30",
"default": 30,
"minimum": 1,
},
},
"required": ["formid", "data_payload"],
},
)
]
def _normalize_payload(value: object) -> Any:
if value is None:
raise ValueError("missing data_payload")
if isinstance(value, str):
normalized = value.strip()
if not normalized:
raise ValueError("missing data_payload")
try:
return json.loads(normalized)
except json.JSONDecodeError:
return normalized
return value
async def handle_call_tool(name: str, arguments: Dict[str, object]) -> List[TextContent]:
try:
if name != "save_form":
raise ValueError(f"unknown tool name: {name}")
formid = str(arguments.get("formid", "")).strip()
if not formid:
raise ValueError("missing formid")
timeout_value = arguments.get("timeout", 30)
timeout = int(timeout_value)
if timeout <= 0:
raise ValueError("timeout must be greater than 0")
result = save(
formid=formid,
data_payload=_normalize_payload(arguments.get("data_payload")),
timeout=timeout,
)
return [
TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2),
)
]
except Exception as exc:
return [TextContent(type="text", text=f"Failed to call tool {name}: {exc}")]

View File

@@ -0,0 +1,22 @@
{
"mcpServers": {
"k3cloud-mcp": {
"disabled": false,
"type": "stdio",
"timeout": 30,
"command": "uvx",
"args": [
"k3cloud-mcp"
],
"env": {
"K3CLOUD_BASE_URL": "http://218.30.128.86:5366/k3cloud/",
"K3CLOUD_ACCT_ID": "69c1d4c23b97b0",
"K3CLOUD_APP_ID": "339175_429p68CF1rD+QVVG012K0/+H1L581Dno",
"K3CLOUD_USERNAME": "杜长远",
"K3CLOUD_APP_SECRET": "05bf79c2636a4bfa8063c0f1742ceeb1",
"K3CLOUD_LCID": "2052",
"K3CLOUD_ORG_NUM": "0"
}
}
}
}

View File

@@ -0,0 +1,19 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "k3cloud-mcp"
version = "0.1.0"
description = "K3Cloud MCP server"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.6.0",
"requests>=2.25.0"
]
[project.scripts]
k3cloud-mcp = "k3cloud_mcp.main:main"
[tool.hatch.build.targets.wheel]
packages = ["k3cloud_mcp"]

View File

@@ -1,199 +0,0 @@
{
"schema": "2.0",
"config": {
"update_multi": true,
"style": {
"text_size": {
"normal_v2": {
"default": "normal",
"pc": "normal",
"mobile": "heading"
}
}
}
},
"body": {
"direction": "vertical",
"elements": [
{
"tag": "form",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>确认单号:</font>** <font color='grey'>${order_number}</font>",
"text_align": "left",
"text_size": "normal"
},
{
"tag": "markdown",
"content": "**<font color='blue-600'>用户:</font>** <person id=${user_id} show_name=true show_avatar=true style='normal'></person>",
"text_align": "left",
"text_size": "normal"
},
{
"tag": "markdown",
"content": "**<font color='blue-600'>发生时间:</font>** <font color='grey'>${change_time}</font>",
"text_align": "left",
"text_size": "normal"
},
{
"tag": "markdown",
"content": "**<font color='blue-600'>\\*请准确选择关于您的变动项:</font>**",
"text_align": "left",
"text_size": "normal",
"margin": "0px 0px 8px 0px"
},
{
"tag": "multi_select_static",
"placeholder": {
"tag": "plain_text",
"content": "请选择资产变动项"
},
"options": "${asset_list}",
"type": "default",
"width": "fill",
"required": false,
"name": "input_assets",
"margin": "0px 0px 16px 0px",
"element_id": "cIiptD7Z4hCtAeR5Rb0b"
},
{
"tag": "hr",
"margin": "0px 0px 0px 0px"
},
{
"tag": "markdown",
"content": "**<font color='blue-600'>其他说明:</font>**",
"text_align": "left",
"text_size": "normal_v2",
"margin": "0px 0px 0px 0px"
},
{
"tag": "input",
"placeholder": {
"tag": "plain_text",
"content": "请输入"
},
"default_value": "",
"width": "fill",
"name": "input_remark",
"margin": "0px 0px 0px 0px"
},
{
"tag": "column_set",
"flex_mode": "flow",
"horizontal_spacing": "8px",
"horizontal_align": "right",
"columns": [
{
"tag": "column",
"width": "auto",
"elements": [
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "确认"
},
"type": "primary_filled",
"width": "default",
"size": "medium",
"behaviors": [
{
"type": "callback",
"value": {
"action": "card.action.trigger"
}
}
],
"form_action_type": "submit",
"name": "confirm_button",
"margin": "4px 0px 4px 0px"
}
],
"vertical_spacing": "8px",
"horizontal_align": "left",
"vertical_align": "top"
},
{
"tag": "column",
"width": "auto",
"elements": [
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "反馈问题"
},
"type": "default",
"width": "default",
"confirm": {
"title": {
"tag": "plain_text",
"content": "反馈误报并废除该确认单吗?"
},
"text": {
"tag": "plain_text",
"content": "${remark}"
}
},
"behaviors": [
{
"type": "callback",
"value": {
"action": "card.action.trigger"
}
}
],
"form_action_type": "submit",
"name": "feedback_button",
"margin": "4px 0px 4px 0px"
}
],
"vertical_spacing": "8px",
"horizontal_align": "left",
"vertical_align": "top"
}
]
}
],
"direction": "vertical",
"horizontal_align": "left",
"vertical_align": "top",
"padding": "12px 12px 12px 12px",
"margin": "0px 0px 0px 0px",
"name": "asset_confirmation_form"
},
{
"tag": "hr",
"margin": "0px 0px 0px 0px"
}
]
},
"header": {
"title": {
"tag": "plain_text",
"content": "资产变动单"
},
"subtitle": {
"tag": "plain_text",
"content": ""
},
"text_tag_list": [
{
"tag": "text_tag",
"text": {
"tag": "plain_text",
"content": "待确认"
},
"color": "orange"
}
],
"template": "blue",
"icon": {
"tag": "standard_icon",
"token": "googledrive_outlined"
},
"padding": "12px 8px 12px 8px"
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "lzwcai-lark-mcp"
version = "0.1.3"
version = "0.1.17"
description = "Lark MCP server"
requires-python = ">=3.10"
dependencies = [

View File

@@ -1,7 +1,9 @@
import asyncio
import json
import os
import sys
import types
from pathlib import Path
import requests
def main() -> None:
@@ -9,33 +11,72 @@ def main() -> None:
config_path = Path(__file__).with_name("mcp-server.json")
if config_path.exists():
config_data = json.loads(config_path.read_text(encoding="utf-8"))
servers = config_data.get("mcpServers", {})
env_data = (
config_data.get("mcpServers", {})
.get("lzwcai-mcpskills-lark-mcp", {})
.get("env", {})
servers.get("lzwcai-lark-mcp", {}).get("env", {})
or servers.get("lzwcai-mcpskills-lark-mcp", {}).get("env", {})
)
if env_data.get("app_id") and env_data.get("app_secret"):
os.environ["app_id"] = env_data["app_id"]
os.environ["app_secret"] = env_data["app_secret"]
if not os.getenv("app_id") or not os.getenv("app_secret"):
raise RuntimeError("missing app_id or app_secret")
from lzwcai_lark_mcp.main import LarkMcpServer
from lzwcai_lark_mcp.tools import send_asset_confirmation_card
async def _run() -> None:
server = LarkMcpServer()
await server.ensure_token()
user_id = "843ga2gb"
result = send_asset_confirmation_card(
server.tenant_access_token or "",
user_id,
"2026-02-13 10:30:00",
["华为i手机"],
["红米手机"]
)
print(result)
asyncio.run(_run())
if "mcp" not in sys.modules:
mcp_module = types.ModuleType("mcp")
types_module = types.ModuleType("mcp.types")
class Tool:
def __init__(self, *args, **kwargs):
pass
class TextContent:
def __init__(self, *args, **kwargs):
pass
types_module.Tool = Tool
types_module.TextContent = TextContent
mcp_module.types = types_module
sys.modules["mcp"] = mcp_module
sys.modules["mcp.types"] = types_module
from lzwcai_lark_mcp.tools import send_notion_card, send_stranger_card
app_id = os.getenv("app_id", "")
app_secret = os.getenv("app_secret", "")
auth_url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
response = requests.post(
auth_url,
json={"app_id": app_id, "app_secret": app_secret},
headers={"Content-Type": "application/json"},
timeout=10
)
response.raise_for_status()
data = response.json()
if data.get("code") not in (0, None):
raise RuntimeError(f"lark auth failed: {data}")
token = data.get("tenant_access_token", "")
if not token:
raise RuntimeError(f"lark auth response missing token: {data}")
user_id = "gegg1d78"
receiver_ids = ["843ga2gb", "gegg1d78"]
person_id = "gegg1d78"
image_key = "img_v3_0210i_94bdf5de-5c89-49f0-a793-c504c7377c7g"
card_message_ids = send_notion_card(
token,
receiver_ids,
person_id,
image_key
)
print(card_message_ids)
result = send_stranger_card(
token,
user_id,
"CONF-20260301-001",
"2026-03-01 10:30:00",
[
{"华为手机": "huawei_phone"},
{"红米手机": "redmi_phone"}
],
face_cap="img_v3_02vj_b25b040f-b6c1-49f4-a29d-a02c99a13a9g",
user_ids=["347f5e71", "gegg1d78"],
remark="如有误报请点击反馈"
)
print(result)
if __name__ == "__main__":

View File

@@ -0,0 +1,119 @@
import lark_oapi as lark
from lark_oapi.api.im.v1 import *
import json
import os
from datetime import datetime
# 配置你的 App ID 和 App Secret
APP_ID = 'cli_a8d0e0c140169013'
APP_SECRET = 'yEc0E8Aoo8Mo9NPPzphidez51xB71HXW'
# 你的 Open ID (请确保这个 ID 是正确的)
RECEIVE_ID = "ou_5c041720bc5a15235d6026ef118d77c9"
RECEIVE_ID_TYPE = "open_id"
# 卡片 JSON 文件路径
CARD_JSON_PATH = r"/home/lzwc/project/warehouse/origin_scripts/卡片源代码(供参考,禁止直接改动).json"
def load_and_render_card():
# 1. 读取 JSON 文件
with open(CARD_JSON_PATH, "r", encoding="utf-8") as f:
card_content = f.read()
# 2. 准备替换的数据
# 注意:简单的字符串替换无法处理 "${asset_list}" 这种需要替换为 JSON 数组的情况
# 所以我们需要先解析 JSON再遍历替换或者用更巧妙的方法
# 构造选项列表
asset_options = [
{"text": {"tag": "plain_text", "content": "显示器"}, "value": "monitor"},
{"text": {"tag": "plain_text", "content": "键盘"}, "value": "keyboard"},
{"text": {"tag": "plain_text", "content": "鼠标"}, "value": "mouse"}
]
# 这里我们采用一种混合策略:先替换简单的字符串变量,再解析 JSON 替换复杂对象
# 替换简单变量
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
card_content = card_content.replace("${order_number}", "ORD-TEST-001")
card_content = card_content.replace("${user_id}", RECEIVE_ID)
card_content = card_content.replace("${change_time}", current_time)
card_content = card_content.replace("${remark}", "如果不属实,请点击此按钮反馈")
# 解析为 Python 对象
card_json = json.loads(card_content)
# 3. 替换复杂对象 (options)
# 我们需要找到那个 multi_select_static 组件并替换它的 options
# 同时,我们需要将 order_number 注入到按钮的 value 中,以便回调时能获取到
order_number_val = "ORD-TEST-001"
try:
# 递归查找并替换 options="${asset_list}" 以及注入 order_number
def process_nodes(node):
if isinstance(node, dict):
# Check for options replacement
for key, value in node.items():
if key == "options" and value == "${asset_list}":
node[key] = asset_options
# Check for button behaviors
if node.get("tag") == "button":
behaviors = node.get("behaviors", [])
for behavior in behaviors:
if behavior.get("type") == "callback" and "value" in behavior:
# Inject order_number into the callback value
if isinstance(behavior["value"], dict):
behavior["value"]["order_number"] = order_number_val
# Recursively process children
for key, value in node.items():
process_nodes(value)
elif isinstance(node, list):
for item in node:
process_nodes(item)
process_nodes(card_json)
except Exception as e:
print(f"替换变量失败: {e}")
return None
return card_json
def main():
# 加载并渲染卡片
card_json = load_and_render_card()
if not card_json:
return
# 创建 Client
client = lark.Client.builder() \
.app_id(APP_ID) \
.app_secret(APP_SECRET) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 构造请求
request = CreateMessageRequest.builder() \
.receive_id_type(RECEIVE_ID_TYPE) \
.request_body(CreateMessageRequestBody.builder()
.receive_id(RECEIVE_ID)
.msg_type("interactive")
.content(json.dumps(card_json)) # 这里再次序列化为字符串
.build()) \
.build()
# 发送请求
response = client.im.v1.message.create(request)
# 处理响应
if not response.success():
print(f"发送失败: code: {response.code}, msg: {response.msg}, error: {response.error}")
return
print(f"发送成功! message_id: {response.data.message_id}")
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,222 @@
from typing import Sequence
import logging
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .mcp_mqtt import get_mcpmqtt_handler
from .nav_server import NavServer
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def serve() -> None:
server = Server("terminal_temi_mcp")
mmhandler = get_mcpmqtt_handler()
nav_server = NavServer(mmhandler)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有工具"""
return [
Tool(
name="recharge",
description="轮足机器人充电",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="terminate",
description="轮足机器人终止当前任务",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="goto",
description="轮足机器人导航到指定地点为用户引路。触发关键词:带我去、导航、引路、带路、怎么走、在哪里。",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "目标地点名称",
"minLength": 1
}
},
"required": ["location"]
}
),
Tool(
name="speak",
description="轮足机器人进行语音播报:告诉、提醒、告知、提示、通知",
inputSchema={
"type": "object",
"properties": {
"speech": {
"type": "string",
"description": "要播报的语音内容",
"minLength": 1
}
},
"required": ["speech"]
}
),
Tool(
name="reception",
description="轮足机器人去接待客人:去接人、请迎接客人、去接待、迎接一下、带人过来",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "在这个位置接待贵宾",
"minLength": 1
},
"name": {
"type": "string",
"description": "客人姓名",
"minLength": 1
},
"destination": {
"type": "string",
"description": "接待贵宾到这个位置",
"minLength": 1
}
},
"required": ["location", "name", "destination"]
}
),
Tool(
name="notification",
description="轮足机器人去指定地点播放通知:通知、去那里说、去某地播报、传达消息",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "机器人需要前往的目标地点名称",
"minLength": 1
},
"text": {
"type": "string",
"description": "到达地点后需要播放的文本内容",
"minLength": 1
}
},
"required": ["location", "text"]
}
),
Tool(
name="repose",
description="轮足机器人、助手、机器人去重新定位",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="patrol",
description="轮足机器人、助手、机器人去巡逻:巡逻、巡查、去检查一下、去看看、去巡视。支持按路线巡逻或随机巡逻。",
inputSchema={
"type": "object",
"properties": {
"locations": {
"type": "array",
"description": "机器人巡逻经过的地点列表。如果不提供,则默认进行随机巡逻。",
"items": {
"type": "string"
}
},
"flag": {
"type": "boolean",
"description": "是否随机巡逻。True为随机巡逻False为按locations指定的路线巡逻。"
}
},
"required": []
}
),
Tool(
name="dance",
description="轮足机器人跳舞",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
"""处理工具调用"""
try:
result = ""
if name == "recharge":
result = await nav_server.recharge()
elif name == "terminate":
result = await nav_server.terminate()
elif name == "goto":
if "location" not in arguments:
raise ValueError("缺少必要参数: location")
result = await nav_server.goto(
location=arguments["location"]
)
elif name == "speak":
if "speech" not in arguments:
raise ValueError("缺少必要参数: speech")
result = await nav_server.speak(
speech=arguments["speech"]
)
elif name == "reception":
if "location" not in arguments:
raise ValueError("缺少必要参数: location")
result = await nav_server.reception(
location=arguments.get("location", "前台"),
name=arguments.get("name", "贵宾"),
destination=arguments.get("destination", "会议室")
)
elif name == "notification":
if "location" not in arguments or "text" not in arguments:
raise ValueError("缺少必要参数: location or text")
result = await nav_server.notification(
location=arguments["location"],
text=arguments["text"]
)
elif name == "repose":
result = await nav_server.repose()
elif name == "dance":
result = await nav_server.dance()
elif name == "patrol":
locations = arguments.get("locations", [])
flag = arguments.get("flag", True if not locations else False)
result = await nav_server.patrol(
locations=locations,
flag=flag
)
else:
raise ValueError(f"未知工具: {name}")
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"工具调用失败: {str(e)}")
raise ValueError(f"执行失败: {str(e)}")
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
def main():
asyncio.run(serve())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,113 @@
import paho.mqtt.client as mqtt
import json
import logging
import threading
from typing import Optional
import uuid
import threading
import requests
from os import getenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MQTT_CLIENT_ID = f"MCPMQTT-{uuid.uuid4().hex[:8]}"
def getConfig_url(array):
url = "http://lzwcai-demp-corp-manager:8086/system/config/getConfig"
data = [array]
try:
response = requests.post(url, json=data, timeout=5)
response.raise_for_status()
data = response.json()['data']
return data[0]['configValue']
except Exception as e:
print(f"Error fetching config for {array}: {e}")
return None
class MQTTHandler:
def __init__(self):
self.client = mqtt.Client()
self.tasks = {} # {task_id: task_data}
self._lock = threading.Lock()
# 获取MQTT配置提供默认值
self.mqtt_username = getenv('MQTT_USERNAME') or 'lzwc'
self.mqtt_password = getenv('MQTT_PASSWORD') or 'Lzwc@4187.'
mqtt_broker_raw = getenv('MQTT_BROKER') or 'emqx'
# 移除协议前缀,只保留主机名
self.mqtt_broker = mqtt_broker_raw.replace('tcp://', '').replace('mqtt://', '')
mqtt_port_str = getenv('MQTT_PORT')
self.mqtt_port = int(mqtt_port_str) if mqtt_port_str else 1883
# 记录配置信息
logger.info(f"MQTT配置 - Broker: {self.mqtt_broker}, Port: {self.mqtt_port}, Username: {self.mqtt_username}")
if not getenv('MQTT_BROKER'):
logger.warning("MQTT_BROKER环境变量未设置使用默认值")
if not mqtt_port_str:
logger.warning("MQTT_PORT环境变量未设置使用默认端口1883")
self.client.username_pw_set(self.mqtt_username, self.mqtt_password)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
try:
logger.info(f"正在连接 MQTT 代理: {self.mqtt_broker}:{self.mqtt_port}")
self.client.connect(self.mqtt_broker, self.mqtt_port, 60)
self.client.loop_start()
except Exception as e:
logger.error(f"连接 MQTT 失败: {e}")
def _on_connect(self, client, userdata, flags, rc):
"""MQTT 连接回调"""
if rc == 0:
logger.info("已成功连接到 MQTT 代理服务器")
else:
logger.error(f"连接 MQTT 代理服务器失败,返回码: {rc}")
def get_status(self, task_id: str = None):
"""获取任务状态
Args:
task_id: 可选参数指定任务ID。如果为None则返回所有任务状态
Returns:
如果指定task_id则返回该任务的状态否则返回所有任务状态
"""
with self._lock:
if task_id:
return self.tasks.get(task_id, {}).copy()
return {k: v.copy() for k, v in self.tasks.items()}
def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
logger.info(f"[MQTT] topic={msg.topic} msg: {payload}")
# task_id = payload.get('task_id')
# if task_id and task_id in self.tasks:
# with self._lock:
# self.tasks[task_id].update({
# 'status': payload.get('status', 'UNKNOWN'),
# 'description': payload.get('description', '')
# })
except Exception as e:
logger.error(f"[错误] 处理消息失败: {e}")
def _on_disconnect(self, client, userdata, rc):
"""MQTT 断开连接回调"""
if rc != 0:
logger.warning("意外断开与 MQTT 代理服务器的连接。")
try:
logger.info("尝试重新连接 MQTT...")
client.reconnect()
except Exception as e:
logger.error(f"重新连接 MQTT 失败: {e}")
# 单例模式
_instance: Optional[MQTTHandler] = None
def get_mcpmqtt_handler() -> MQTTHandler:
"""获取单例"""
global _instance
if _instance is None:
_instance = MQTTHandler()
return _instance

View File

@@ -0,0 +1,142 @@
from typing import Optional, Sequence, List, Dict, Any
import logging
import json
from .mcp_mqtt import get_mcpmqtt_handler
logger = logging.getLogger(__name__)
class NavServer:
def __init__(self, mmhandler=None):
self.mmhandler = mmhandler or get_mcpmqtt_handler()
async def pub_cmd(self, device_id: str, action: str, params: Dict[str, Any]):
"""
发送MQTT命令
:param device_id: 设备ID
:param action: 动作指令 (对应 Kotlin 中的 action/cmd/type)
:param params: 其他参数 (将被合并到根 JSON 对象中)
"""
try:
payload = {
"device_id": device_id,
"action": action
}
if params:
payload.update(params)
logger.info(f"Publishing command: {action}, payload: {payload}")
self.mmhandler.client.publish("robot/cmd", json.dumps(payload), qos=2)
return f"{action} 任务已经下达完成"
except Exception as e:
logger.error(f"Failed to publish command: {str(e)}", exc_info=True)
return f"Failed to publish command: {str(e)}"
async def recharge(self):
"""轮足机器人充电"""
try:
params = {}
return await self.pub_cmd("temi-test", "recharge", params)
except Exception as e:
logger.error(f"Failed to call recharge mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call recharge mcp-tool: {str(e)}"
async def terminate(self):
"""轮足机器人终止当前任务"""
try:
params = {}
return await self.pub_cmd("temi-test", "terminate", params)
except Exception as e:
logger.error(f"Failed to call terminate mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call terminate mcp-tool: {str(e)}"
async def goto(self, location: str, flag: bool = False):
"""轮足导航到指定位置"""
try:
if not location:
return "Location is not specified."
# Kotlin 端对应 action: "goto"
# Kotlin 端参数: location (or target)
params = {
"location": location
}
return await self.pub_cmd("temi-test", "goto", params)
except Exception as e:
logger.error(f"Failed to call navigation mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call navigation mcp-tool: {str(e)}"
async def speak(self, speech: str):
"""轮足机器人语音播报"""
try:
if not speech:
return "Speech content is not specified."
params = {
"text": speech,
"lang": "zh"
}
return await self.pub_cmd("temi-test", "speak", params)
except Exception as e:
logger.error(f"Failed to call speak mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call speak mcp-tool: {str(e)}"
async def reception(self, location: str = "前台", name: str = "贵宾", destination: str = "会议室"):
"""轮足机器人移动到指定位置迎宾"""
try:
params = {
"location": location,
"text": f"您好,{name},我是接待机器人。",
"destination": destination
}
return await self.pub_cmd("temi-test", "reception", params)
except Exception as e:
logger.error(f"Failed to call reception mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call reception mcp-tool: {str(e)}"
async def notification(self, location: str, text: str):
"""轮足机器人移动到指定位置并播放通知文本"""
try:
if not location or not text:
return "Location or text is not specified."
params = {
"location": location,
"text": text
}
return await self.pub_cmd("temi-test", "notification", params)
except Exception as e:
logger.error(f"Failed to call notification mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call notification mcp-tool: {str(e)}"
async def repose(self):
"""轮足机器人重新定位"""
try:
params = {}
return await self.pub_cmd("temi-test", "repose", params)
except Exception as e:
logger.error(f"Failed to call repose mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call repose mcp-tool: {str(e)}"
async def dance(self):
"""轮足机器人跳舞"""
try:
params = {}
return await self.pub_cmd("temi-test", "dance", params)
except Exception as e:
logger.error(f"Failed to call dance mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call dance mcp-tool: {str(e)}"
async def patrol(self, locations: list = None, flag: bool = False):
"""轮足机器人巡逻 """
try:
params = {
"flag": True,
"locations": locations or []
}
return await self.pub_cmd("temi-test", "patrol", params)
except Exception as e:
logger.error(f"Failed to call patrol mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call patrol mcp-tool: {str(e)}"

View File

@@ -0,0 +1,13 @@
{
"mcpServers": {
"lzwcai_temi_mcp": {
"disabled": false,
"type": "stdio",
"timeout": 30,
"command": "uvx",
"args": [
"lzwcai_temi_mcp"
]
}
}
}

View File

@@ -0,0 +1,26 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "lzwcai_temi_mcp"
version = "0.1.12"
description = "MQTT-based navigation server for robot"
requires-python = ">=3.10"
dependencies = [
"fastapi>=0.95.0",
"uvicorn>=0.21.0",
"paho-mqtt>=2.0.0",
"pydantic>=1.10.0",
"python-dotenv>=0.21.0",
"mcp[cli]>=1.6.0",
"requests>=2.25.0"
]
[project.scripts]
lzwcai_temi_mcp = "lzwcai_temi_mcp.main:main"
[tool.hatch.build.targets.wheel]
packages = ["lzwcai_temi_mcp"]

17
mcp.json Normal file
View File

@@ -0,0 +1,17 @@
{
"mcpServers": {
"lark-mcp": {
"command": "npx",
"args": [
"-y",
"@larksuiteoapi/lark-mcp",
"mcp",
"-a",
"cli_a8d0e0c140169013",
"-s",
"yEc0E8Aoo8Mo9NPPzphidez51xB71HXW"
]
}
}
}