Files
lzwcai-mcp/lzwcai_temi_mcp/lzwcai_temi_mcp/main.py
bin 3d04166314 feat: 新增轮足机器人导航MCP服务器
添加 lzwcai_temi_mcp 包,实现基于MQTT的轮足机器人导航控制服务器。包含以下功能:
- 通过 MQTT 发布机器人控制命令
- 提供 goto、speak、reception、repose、patrol 等导航工具
- 支持通过 MCP 协议与AI助手集成
- 更新 lzwcai_lark_mcp 版本至 0.1.11
2026-03-15 18:36:34 +08:00

152 lines
5.5 KiB
Python

from typing import Sequence
import logging
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .mcp_mqtt import get_mcpmqtt_handler
from .nav_server import NavServer
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def serve() -> None:
server = Server("terminal_temi_mcp")
mmhandler = get_mcpmqtt_handler()
nav_server = NavServer(mmhandler)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有工具"""
return [
Tool(
name="goto",
description="轮足机器人导航到指定地点为用户引路。触发关键词:带我去、导航、引路、带路、怎么走、在哪里。",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "目标地点名称",
"minLength": 1
}
},
"required": ["location"]
}
),
Tool(
name="speak",
description="轮足机器人进行语音播报:告诉、提醒、告知、提示、通知",
inputSchema={
"type": "object",
"properties": {
"speech": {
"type": "string",
"description": "要播报的语音内容",
"minLength": 1
}
},
"required": ["speech"]
}
),
Tool(
name="reception",
description="轮足机器人去接待客人:去接人、请迎接客人、去接待、迎接一下、带人过来",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "引导接待客人到这个位置",
"minLength": 1
},
"name": {
"type": "string",
"description": "客人姓名",
"minLength": 1
}
},
"required": ["location", "name"]
}
),
Tool(
name="repose",
description="轮足机器人、助手、机器人去重新定位",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="patrol",
description="轮足机器人、助手、机器人去巡逻:巡逻、巡查、去检查一下、去看看、去巡视",
inputSchema={
"type": "object",
"properties": {
"locations": {
"type": "array",
"description": "机器人巡逻经过的地点列表",
"items": {
"type": "string"
}
}
},
"required": ["locations"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
"""处理工具调用"""
try:
result = ""
if name == "goto":
if "location" not in arguments:
raise ValueError("缺少必要参数: location")
result = await nav_server.goto(
location=arguments["location"]
)
elif name == "speak":
if "speech" not in arguments:
raise ValueError("缺少必要参数: speech")
result = await nav_server.speak(
speech=arguments["speech"]
)
elif name == "reception":
if "location" not in arguments:
raise ValueError("缺少必要参数: location")
result = await nav_server.reception(
location=arguments["location"],
name=arguments.get("name", "贵宾")
)
elif name == "repose":
result = await nav_server.repose()
elif name == "patrol":
if "locations" not in arguments:
raise ValueError("缺少必要参数: locations")
result = await nav_server.patrol(
locations=arguments["locations"]
)
else:
raise ValueError(f"未知工具: {name}")
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"工具调用失败: {str(e)}")
raise ValueError(f"执行失败: {str(e)}")
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
def main():
asyncio.run(serve())
if __name__ == "__main__":
main()