diff --git a/terminal_go2_mcp/README.md b/terminal_go2_mcp/README.md new file mode 100644 index 0000000..7348161 --- /dev/null +++ b/terminal_go2_mcp/README.md @@ -0,0 +1,31 @@ +# Terminal Go2 MCP Server + +This project provides a server that uses the Machine Control Protocol (MCP) to interact with a Go2 robot via MQTT. It exposes the robot's capabilities, such as navigation and speech, as tools that can be called by an MCP client. + +## Features + +- **MQTT Integration:** Communicates with the robot over MQTT for robust and real-time command execution. +- **MCP Tooling:** Exposes robot actions as standardized MCP tools. +- **Core Functions:** + - `nav_to`: Navigate the robot to a specified location. + - `speak`: Make the robot say a given phrase. + - `custom_action`: Trigger a predefined custom action on the robot. + - `save_position`: Save the robot's current position with a name. + +## Installation + +To install the necessary dependencies, run: + +```bash +pip install -e . +``` + +## Usage + +To start the server, run the following command from the project's root directory: + +```bash +terminal_go2_mcp +``` + +The server will connect to the MQTT broker and expose the robot's functions as MCP tools. diff --git a/terminal_go2_mcp/mcp-server-go2.json b/terminal_go2_mcp/mcp-server-go2.json new file mode 100644 index 0000000..4340f89 --- /dev/null +++ b/terminal_go2_mcp/mcp-server-go2.json @@ -0,0 +1,17 @@ +{ + "mcpServers": { + "terminal-go2-mcp": { + "disabled": false, + "type": "stdio", + "timeout": 30, + "command": "uvx", + "args": [ + "terminal_go2_mcp" + ], + "env": { + "employeeId": "$employeeId$", + "userId": "$employeeId$" + } + } + } +} diff --git a/terminal_go2_mcp/pyproject.toml b/terminal_go2_mcp/pyproject.toml new file mode 100644 index 0000000..f26e0c8 --- /dev/null +++ b/terminal_go2_mcp/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "terminal_go2_mcp" +version = "0.0.1" +description = "MQTT-based navigation server for robot" +requires-python = ">=3.10" +dependencies = [ + "fastapi>=0.95.0", + "uvicorn>=0.21.0", + "paho-mqtt>=2.0.0", + "pydantic>=1.10.0", + "python-dotenv>=0.21.0", + "mcp[cli]>=1.6.0", +] + +[project.scripts] +terminal_go2_mcp = "terminal_go2_mcp.main:main" + + +[tool.hatch.build.targets.wheel] +packages = ["terminal_go2_mcp"] \ No newline at end of file diff --git a/terminal_go2_mcp/terminal_go2_mcp/__init__.py b/terminal_go2_mcp/terminal_go2_mcp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/terminal_go2_mcp/terminal_go2_mcp/main.py b/terminal_go2_mcp/terminal_go2_mcp/main.py new file mode 100644 index 0000000..540c4d0 --- /dev/null +++ b/terminal_go2_mcp/terminal_go2_mcp/main.py @@ -0,0 +1,266 @@ +from typing import Optional, Sequence +import logging +from fastapi import FastAPI +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import Tool, TextContent +import asyncio +import json +import time +import uuid +from mcp_mqtt import get_mqtt_handler + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class TerminalGo2McpServer: + def __init__(self, mmhandler=None): + self.mmhandler = mmhandler or get_mqtt_handler() + + async def pubCmd(self, task_id: str, device_id: str, cmd: str, params: dict): + try: + payload = { + "task_id": task_id, + "device_id": device_id, + "cmd": cmd, + "data": params["data"] + } + self.mmhandler.client.publish("robot/cmd", json.dumps(payload), qos=2) + return "任务发布完成" + except Exception as e: + logger.error(f"Failed to publish {cmd} command: {str(e)}", exc_info=True) + raise + + async def nav_to(self, task_id: str, device_id: str, location: str): + """ + 机器人导航去某个地方。 + Args: + - task_id (str): 任务id + - device_id (str): 设备id + - location (str): 目标地点名称 + Returns: + str: 命令执行结果消息 + + Raises: + ValueError: 如果参数无效或为空 + Exception: 如果MQTT发布失败 + """ + if task_id and device_id and location: + try: + params = { + "data": {"location": location} + } + return await self.pubCmd(task_id, device_id, "nav", params) + except Exception as e: + logger.error(f"Failed to call navigation mcp-tool: {str(e)} ", exc_info=True) + raise + + async def speak(self, task_id: str, device_id: str, speech: str): + """ + 机器人说某些话。 + Args: + - task_id (str): 任务id + - device_id (str): 设备id + - speech (str): 说话内容 + Returns: + str: 命令执行结果消息 + + Raises: + ValueError: 如果参数无效或为空 + Exception: 如果MQTT发布失败 + """ + if task_id and device_id and speech: + try: + params = { + "data": {"speech": speech} + } + return await self.pubCmd(task_id, device_id, "speak", params) + except Exception as e: + logger.error(f"Failed to call speak mcp-tool: {str(e)} ", exc_info=True) + raise + + async def custom_action(self, task_id: str, device_id: str, action: str): + """ + 机器人自定义动作展示。 + Args: + - task_id (str): 任务id + - device_id (str): 设备id + - action (str): 动作名称 + Returns: + str: 命令执行结果消息 + + Raises: + ValueError: 如果参数无效或为空 + Exception: 如果MQTT发布失败 + """ + if task_id and device_id and action: + try: + params = { + "data": { + "action": action + } + } + return await self.pubCmd(task_id, device_id, "action", params) + except Exception as e: + logger.error(f"Failed to call action mcp-tool: {str(e)} ", exc_info=True) + raise + + async def save_position(self, task_id: str, device_id: str, location_name: str): + """ + 机器人保存具体地点。 + Args: + - task_id (str): 任务id + - device_id (str): 设备id + - location_name (str): 地点名称 + Returns: + str: 命令执行结果消息 + Raises: + ValueError: 如果参数无效或为空 + Exception: 如果MQTT发布失败 + """ + if task_id and device_id and location_name: + try: + params = { + "data": { + "location_name": location_name + } + } + return await self.pubCmd(task_id, device_id, "saveLocation", params) + except Exception as e: + logger.error(f"Failed to call saveLocation mcp-tool: {str(e)} ", exc_info=True) + raise + + +async def serve() -> None: + server = Server("terminal_go2_mcp") + mmhandler = get_mqtt_handler() + mcp_server = TerminalGo2McpServer(mmhandler) + + @server.list_tools() + async def list_tools() -> list[Tool]: + """列出所有工具""" + return [ + Tool( + name="nav_to", + description="机器人导航去某个地方。", + inputSchema={ + "type": "object", + "properties": { + "task_id": { + "type": "string", + "description": "任务ID", + "minLength": 1 + }, + "device_id": { + "type": "string", + "description": "设备ID", + "minLength": 1 + }, + "location": { + "type": "string", + "description": "目标地点名称", + "minLength": 1 + } + }, + "required": ["task_id", "device_id", "location"] + } + ), + Tool( + name="speak", + description="机器人说某些话。", + inputSchema={ + "type": "object", + "properties": { + "task_id": { + "type": "string", + "description": "任务ID", + "minLength": 1 + }, + "device_id": { + "type": "string", + "description": "设备ID", + "minLength": 1 + }, + "speech": { + "type": "string", + "description": "说话内容", + "minLength": 1 + } + }, + "required": ["task_id", "device_id", "speech"] + } + ), + Tool( + name="custom_action", + description="机器人自定义动作展示。", + inputSchema={ + "type": "object", + "properties": { + "task_id": { + "type": "string", + "description": "任务ID", + "minLength": 1 + }, + "device_id": { + "type": "string", + "description": "设备ID", + "minLength": 1 + }, + "action": { + "type": "string", + "description": "动作名称", + "minLength": 1 + } + }, + "required": ["task_id", "device_id", "action"] + } + ) + ] + + @server.call_tool() + async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]: + """处理工具调用""" + try: + if name == "nav_to": + if "params" not in arguments: + raise ValueError("缺少必要参数: params") + result = await mcp_server.nav_to( + task_id=arguments["task_id"], + device_id=arguments["device_id"], + location=arguments["params"]["location"] + ) + elif name == "speak": + if "params" not in arguments: + raise ValueError("缺少必要参数: params") + result = await mcp_server.speak( + task_id=arguments["task_id"], + device_id=arguments["device_id"], + speech=arguments["params"]["speech"] + ) + elif name == "custom_action": + if "params" not in arguments: + raise ValueError("缺少必要参数: params") + result = await mcp_server.custom_action( + task_id=arguments["task_id"], + device_id=arguments["device_id"], + action=arguments["params"]["action"] + ) + else: + raise ValueError(f"不支持的工具名称: {name}") + return [TextContent(text=result)] + except Exception as e: + logger.error(f"Failed to call tool {name}: {str(e)}", exc_info=True) + raise + + options = server.create_initialization_options() + async with stdio_server() as (read_stream, write_stream): + await server.run(read_stream, write_stream, options) + + +if __name__ == "__main__": + asyncio.run(serve()) diff --git a/terminal_go2_mcp/terminal_go2_mcp/mcp_mqtt.py b/terminal_go2_mcp/terminal_go2_mcp/mcp_mqtt.py new file mode 100644 index 0000000..9b838b4 --- /dev/null +++ b/terminal_go2_mcp/terminal_go2_mcp/mcp_mqtt.py @@ -0,0 +1,88 @@ +import paho.mqtt.client as mqtt +import json +import logging +import time +import threading +from typing import Optional, Callable, Dict + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger("MCPMQTT") + +MQTT_BROKER = '192.168.1.199' +MQTT_PORT = 1883 +MQTT_CLIENT_ID = "MCPMQTT" +MQTT_USERNAME = 'lzwc' +MQTT_PASSWORD = 'Lzwc@4187.' +RESPONSE_TOPIC = "unitree_response" + +class MCPMQTT: + def __init__(self): + self.client_ = mqtt.Client(client_id=MQTT_CLIENT_ID) + self.client_.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) + self.client_.on_connect = self._on_connect + self.client_.on_message = self._on_message + self.client_.on_disconnect = self._on_disconnect + self._response_handlers = {} # 存储响应处理器 + self._lock = threading.Lock() # 线程锁 + self._response_topic = RESPONSE_TOPIC + + try: + logger.info(f"正在连接 MQTT 代理: {MQTT_BROKER}:{MQTT_PORT}") + self.client_.connect(MQTT_BROKER, MQTT_PORT, 60) + self.client_.loop_start() + except Exception as e: + logger.error(f"连接 MQTT 失败: {e}") + + def _on_connect(self, client, userdata, flags, rc): + """MQTT 连接回调""" + if rc == 0: + logger.info("已成功连接到 MQTT 代理服务器") + client.subscribe(self._response_topic, qos=2) + logger.info(f"已订阅响应主题: {self._response_topic}") + else: + logger.error(f"连接 MQTT 代理服务器失败,返回码: {rc}") + + def register_response_handler(self, correlation_id: str, callback: Callable): + """注册响应消息处理器""" + with self._lock: + self._response_handlers[correlation_id] = callback + + def _on_message(self, client, userdata, msg): + try: + payload = json.loads(msg.payload.decode()) + topic = msg.topic + logger.info(f"[MQTT] 收到消息 topic={topic}") + + # 处理响应消息 + if topic == self._response_topic and "correlation_id" in payload: + with self._lock: + handler = self._response_handlers.pop(payload["correlation_id"], None) + if handler: + handler(payload) + return + + # 打印常规消息 + logger.info(f"[MQTT] 消息内容: {payload}") + + except Exception as e: + logger.error(f"[错误] 处理消息失败: {e}") + + def _on_disconnect(self, client, userdata, rc): + """MQTT 断开连接回调""" + if rc != 0: + logger.warning("意外断开与 MQTT 代理服务器的连接。") + try: + logger.info("尝试重新连接 MQTT...") + client.reconnect() + except Exception as e: + logger.error(f"重新连接 MQTT 失败: {e}") + +# 单例模式 +_instance: Optional[MCPMQTT] = None + +def get_mqtt_handler() -> MCPMQTT: + """获取MCPMQTT单例""" + global _instance + if _instance is None: + _instance = MCPMQTT() + return _instance