Add initial implementation of Terminal Go2 MCP server with MQTT integration, core functionalities for robot navigation and speech, and comprehensive documentation. Includes configuration files and main application structure.

This commit is contained in:
2025-09-28 12:12:58 +08:00
parent 9b7b69d8c3
commit 924bce9554
6 changed files with 426 additions and 0 deletions

View File

@@ -0,0 +1,31 @@
# Terminal Go2 MCP Server
This project provides a server that uses the Machine Control Protocol (MCP) to interact with a Go2 robot via MQTT. It exposes the robot's capabilities, such as navigation and speech, as tools that can be called by an MCP client.
## Features
- **MQTT Integration:** Communicates with the robot over MQTT for robust and real-time command execution.
- **MCP Tooling:** Exposes robot actions as standardized MCP tools.
- **Core Functions:**
- `nav_to`: Navigate the robot to a specified location.
- `speak`: Make the robot say a given phrase.
- `custom_action`: Trigger a predefined custom action on the robot.
- `save_position`: Save the robot's current position with a name.
## Installation
To install the necessary dependencies, run:
```bash
pip install -e .
```
## Usage
To start the server, run the following command from the project's root directory:
```bash
terminal_go2_mcp
```
The server will connect to the MQTT broker and expose the robot's functions as MCP tools.

View File

@@ -0,0 +1,17 @@
{
"mcpServers": {
"terminal-go2-mcp": {
"disabled": false,
"type": "stdio",
"timeout": 30,
"command": "uvx",
"args": [
"terminal_go2_mcp"
],
"env": {
"employeeId": "$employeeId$",
"userId": "$employeeId$"
}
}
}
}

View File

@@ -0,0 +1,24 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "terminal_go2_mcp"
version = "0.0.1"
description = "MQTT-based navigation server for robot"
requires-python = ">=3.10"
dependencies = [
"fastapi>=0.95.0",
"uvicorn>=0.21.0",
"paho-mqtt>=2.0.0",
"pydantic>=1.10.0",
"python-dotenv>=0.21.0",
"mcp[cli]>=1.6.0",
]
[project.scripts]
terminal_go2_mcp = "terminal_go2_mcp.main:main"
[tool.hatch.build.targets.wheel]
packages = ["terminal_go2_mcp"]

View File

@@ -0,0 +1,266 @@
from typing import Optional, Sequence
import logging
from fastapi import FastAPI
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import json
import time
import uuid
from mcp_mqtt import get_mqtt_handler
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TerminalGo2McpServer:
def __init__(self, mmhandler=None):
self.mmhandler = mmhandler or get_mqtt_handler()
async def pubCmd(self, task_id: str, device_id: str, cmd: str, params: dict):
try:
payload = {
"task_id": task_id,
"device_id": device_id,
"cmd": cmd,
"data": params["data"]
}
self.mmhandler.client.publish("robot/cmd", json.dumps(payload), qos=2)
return "任务发布完成"
except Exception as e:
logger.error(f"Failed to publish {cmd} command: {str(e)}", exc_info=True)
raise
async def nav_to(self, task_id: str, device_id: str, location: str):
"""
机器人导航去某个地方。
Args:
- task_id (str): 任务id
- device_id (str): 设备id
- location (str): 目标地点名称
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if task_id and device_id and location:
try:
params = {
"data": {"location": location}
}
return await self.pubCmd(task_id, device_id, "nav", params)
except Exception as e:
logger.error(f"Failed to call navigation mcp-tool: {str(e)} ", exc_info=True)
raise
async def speak(self, task_id: str, device_id: str, speech: str):
"""
机器人说某些话。
Args:
- task_id (str): 任务id
- device_id (str): 设备id
- speech (str): 说话内容
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if task_id and device_id and speech:
try:
params = {
"data": {"speech": speech}
}
return await self.pubCmd(task_id, device_id, "speak", params)
except Exception as e:
logger.error(f"Failed to call speak mcp-tool: {str(e)} ", exc_info=True)
raise
async def custom_action(self, task_id: str, device_id: str, action: str):
"""
机器人自定义动作展示。
Args:
- task_id (str): 任务id
- device_id (str): 设备id
- action (str): 动作名称
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if task_id and device_id and action:
try:
params = {
"data": {
"action": action
}
}
return await self.pubCmd(task_id, device_id, "action", params)
except Exception as e:
logger.error(f"Failed to call action mcp-tool: {str(e)} ", exc_info=True)
raise
async def save_position(self, task_id: str, device_id: str, location_name: str):
"""
机器人保存具体地点。
Args:
- task_id (str): 任务id
- device_id (str): 设备id
- location_name (str): 地点名称
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if task_id and device_id and location_name:
try:
params = {
"data": {
"location_name": location_name
}
}
return await self.pubCmd(task_id, device_id, "saveLocation", params)
except Exception as e:
logger.error(f"Failed to call saveLocation mcp-tool: {str(e)} ", exc_info=True)
raise
async def serve() -> None:
server = Server("terminal_go2_mcp")
mmhandler = get_mqtt_handler()
mcp_server = TerminalGo2McpServer(mmhandler)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有工具"""
return [
Tool(
name="nav_to",
description="机器人导航去某个地方。",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "任务ID",
"minLength": 1
},
"device_id": {
"type": "string",
"description": "设备ID",
"minLength": 1
},
"location": {
"type": "string",
"description": "目标地点名称",
"minLength": 1
}
},
"required": ["task_id", "device_id", "location"]
}
),
Tool(
name="speak",
description="机器人说某些话。",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "任务ID",
"minLength": 1
},
"device_id": {
"type": "string",
"description": "设备ID",
"minLength": 1
},
"speech": {
"type": "string",
"description": "说话内容",
"minLength": 1
}
},
"required": ["task_id", "device_id", "speech"]
}
),
Tool(
name="custom_action",
description="机器人自定义动作展示。",
inputSchema={
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "任务ID",
"minLength": 1
},
"device_id": {
"type": "string",
"description": "设备ID",
"minLength": 1
},
"action": {
"type": "string",
"description": "动作名称",
"minLength": 1
}
},
"required": ["task_id", "device_id", "action"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
"""处理工具调用"""
try:
if name == "nav_to":
if "params" not in arguments:
raise ValueError("缺少必要参数: params")
result = await mcp_server.nav_to(
task_id=arguments["task_id"],
device_id=arguments["device_id"],
location=arguments["params"]["location"]
)
elif name == "speak":
if "params" not in arguments:
raise ValueError("缺少必要参数: params")
result = await mcp_server.speak(
task_id=arguments["task_id"],
device_id=arguments["device_id"],
speech=arguments["params"]["speech"]
)
elif name == "custom_action":
if "params" not in arguments:
raise ValueError("缺少必要参数: params")
result = await mcp_server.custom_action(
task_id=arguments["task_id"],
device_id=arguments["device_id"],
action=arguments["params"]["action"]
)
else:
raise ValueError(f"不支持的工具名称: {name}")
return [TextContent(text=result)]
except Exception as e:
logger.error(f"Failed to call tool {name}: {str(e)}", exc_info=True)
raise
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
if __name__ == "__main__":
asyncio.run(serve())

View File

@@ -0,0 +1,88 @@
import paho.mqtt.client as mqtt
import json
import logging
import time
import threading
from typing import Optional, Callable, Dict
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("MCPMQTT")
MQTT_BROKER = '192.168.1.199'
MQTT_PORT = 1883
MQTT_CLIENT_ID = "MCPMQTT"
MQTT_USERNAME = 'lzwc'
MQTT_PASSWORD = 'Lzwc@4187.'
RESPONSE_TOPIC = "unitree_response"
class MCPMQTT:
def __init__(self):
self.client_ = mqtt.Client(client_id=MQTT_CLIENT_ID)
self.client_.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
self.client_.on_connect = self._on_connect
self.client_.on_message = self._on_message
self.client_.on_disconnect = self._on_disconnect
self._response_handlers = {} # 存储响应处理器
self._lock = threading.Lock() # 线程锁
self._response_topic = RESPONSE_TOPIC
try:
logger.info(f"正在连接 MQTT 代理: {MQTT_BROKER}:{MQTT_PORT}")
self.client_.connect(MQTT_BROKER, MQTT_PORT, 60)
self.client_.loop_start()
except Exception as e:
logger.error(f"连接 MQTT 失败: {e}")
def _on_connect(self, client, userdata, flags, rc):
"""MQTT 连接回调"""
if rc == 0:
logger.info("已成功连接到 MQTT 代理服务器")
client.subscribe(self._response_topic, qos=2)
logger.info(f"已订阅响应主题: {self._response_topic}")
else:
logger.error(f"连接 MQTT 代理服务器失败,返回码: {rc}")
def register_response_handler(self, correlation_id: str, callback: Callable):
"""注册响应消息处理器"""
with self._lock:
self._response_handlers[correlation_id] = callback
def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
topic = msg.topic
logger.info(f"[MQTT] 收到消息 topic={topic}")
# 处理响应消息
if topic == self._response_topic and "correlation_id" in payload:
with self._lock:
handler = self._response_handlers.pop(payload["correlation_id"], None)
if handler:
handler(payload)
return
# 打印常规消息
logger.info(f"[MQTT] 消息内容: {payload}")
except Exception as e:
logger.error(f"[错误] 处理消息失败: {e}")
def _on_disconnect(self, client, userdata, rc):
"""MQTT 断开连接回调"""
if rc != 0:
logger.warning("意外断开与 MQTT 代理服务器的连接。")
try:
logger.info("尝试重新连接 MQTT...")
client.reconnect()
except Exception as e:
logger.error(f"重新连接 MQTT 失败: {e}")
# 单例模式
_instance: Optional[MCPMQTT] = None
def get_mqtt_handler() -> MCPMQTT:
"""获取MCPMQTT单例"""
global _instance
if _instance is None:
_instance = MCPMQTT()
return _instance