Files
lzwcai-mcp/terminal_go2_mcp/terminal_go2_mcp/main.py
Sucan126 ff39bdbd8a feat: 添加机器人模式切换和连续移动功能
- 新增switch_mode方法用于切换机器人运控模式
- 添加ContinuousMove方法实现机器人连续移动控制
- 更新工具调用处理以支持新功能
- 调整版本号至0.0.9
2025-10-09 19:33:14 +08:00

898 lines
34 KiB
Python

from typing import Optional, Sequence
import logging
from fastapi import FastAPI
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import json
import time
import uuid
from .mcp_mqtt import get_mcpmqtt_handler
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TerminalGo2McpServer:
def __init__(self, mmhandler=None):
self.mmhandler = mmhandler or get_mcpmqtt_handler()
async def pubCmd(self, cmd_type: str, cmd: str, params: dict):
try:
payload = {
"type": cmd_type,
"cmd": cmd,
"param": params
}
self.mmhandler.client.publish("unitree/go2/cmd", json.dumps(payload), qos=2)
return f"{cmd}任务已经下达完成"
except Exception as e:
logger.error(f"Failed to publish command: {str(e)}", exc_info=True)
return f"Failed to publish command: {str(e)}"
async def switch_mode(self, name: str):
"""
切换机器人模式。
Args:
- name (str): 模式名称
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if name:
try:
params = {"name": name}
return await self.pubCmd("msc", "SelectMode", params)
except Exception as e:
logger.error(f"Failed to call switch_mode mcp-tool: {str(e)} ", exc_info=True)
raise
async def custom_action(self, action: str):
"""
机器人自定义动作展示。
Args:
- action (str): 动作名称
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if action:
try:
return await self.pubCmd("sport", action, {})
except Exception as e:
logger.error(f"Failed to call action mcp-tool: {str(e)} ", exc_info=True)
raise
async def ContinuousMove(self, on: bool, vx: float, vy: float, vyaw: float):
"""
开始连续移动。
Args:
- on (bool): 是否开始连续移动
- vx (float): x方向速度
- vy (float): y方向速度
- vyaw (float): 偏航角速度
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
try:
params = {"on": on, "vx": vx, "vy": vy, "vyaw": vyaw}
return await self.pubCmd("sport", "ContinuousMove", params)
except Exception as e:
logger.error(f"Failed to call ContinuousMove: {str(e)}", exc_info=True)
raise
# 新增OAC相关命令
async def obstacle_avoidance_switch(self, enable: bool):
"""
开启/关闭避障功能。
Args:
- enable (bool): 是否开启避障
Returns:
str: 命令执行结果消息
"""
try:
params = {"enable": enable}
return await self.pubCmd("oac", "SwitchSet", params)
except Exception as e:
logger.error(f"Failed to call obstacle_avoidance_switch: {str(e)}", exc_info=True)
raise
async def obstacle_move(self, x: float, y: float, yaw: float):
"""
避障移动。
Args:
- x (float): x坐标
- y (float): y坐标
- yaw (float): 偏航角
Returns:
str: 命令执行结果消息
"""
try:
params = {"x": x, "y": y, "yaw": yaw}
return await self.pubCmd("oac", "Move", params)
except Exception as e:
logger.error(f"Failed to call obstacle_move: {str(e)}", exc_info=True)
raise
async def move_to_absolute_position(self, x: float, y: float, yaw: float):
"""
移动到绝对位置。
Args:
- x (float): x坐标
- y (float): y坐标
- yaw (float): 偏航角
Returns:
str: 命令执行结果消息
"""
try:
params = {"x": x, "y": y, "yaw": yaw}
return await self.pubCmd("oac", "MoveToAbsolutePosition", params)
except Exception as e:
logger.error(f"Failed to call move_to_absolute_position: {str(e)}", exc_info=True)
raise
async def move_to_increment_position(self, x: float, y: float, yaw: float):
"""
移动到相对位置。
Args:
- x (float): x坐标增量
- y (float): y坐标增量
- yaw (float): 偏航角增量
Returns:
str: 命令执行结果消息
"""
try:
params = {"x": x, "y": y, "yaw": yaw}
return await self.pubCmd("oac", "MoveToIncrementPosition", params)
except Exception as e:
logger.error(f"Failed to call move_to_increment_position: {str(e)}", exc_info=True)
raise
# 新增导航相关命令
async def start_mapping(self):
"""
开始建图。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("nav", "start_mapping", params)
except Exception as e:
logger.error(f"Failed to call start_mapping: {str(e)}", exc_info=True)
raise
async def end_mapping(self, address: str):
"""
结束建图并保存地图。
Args:
- address (str): 地图保存地址
Returns:
str: 命令执行结果消息
"""
if address:
try:
params = {"address": address}
return await self.pubCmd("nav", "end_mapping", params)
except Exception as e:
logger.error(f"Failed to call end_mapping: {str(e)}", exc_info=True)
raise
async def initialize_pose(self, x: float, y: float, z: float,
q_x: float, q_y: float, q_z: float, q_w: float, address: str):
"""
初始化机器人位姿。
Args:
- x, y, z (float): 位置坐标
- q_x, q_y, q_z, q_w (float): 四元数姿态
- address (str): 地图地址
Returns:
str: 命令执行结果消息
"""
if address:
try:
params = {
"x": x, "y": y, "z": z,
"q_x": q_x, "q_y": q_y, "q_z": q_z, "q_w": q_w,
"address": address
}
return await self.pubCmd("nav", "initialize_pose", params)
except Exception as e:
logger.error(f"Failed to call initialize_pose: {str(e)}", exc_info=True)
raise
async def pose_navigation(self, target_x: float, target_y: float, target_z: float,
target_q_x: float, target_q_y: float, target_q_z: float, target_q_w: float,
mode: int, speed: float):
"""
位姿导航。
Args:
- target_x, target_y, target_z (float): 目标位置坐标
- target_q_x, target_q_y, target_q_z, target_q_w (float): 目标四元数姿态
- mode (int): 导航模式
- speed (float): 导航速度
Returns:
str: 命令执行结果消息
"""
try:
params = {
"targetPose": {
"x": target_x, "y": target_y, "z": target_z,
"q_x": target_q_x, "q_y": target_q_y, "q_z": target_q_z, "q_w": target_q_w
},
"mode": mode,
"speed": speed
}
return await self.pubCmd("nav", "pose_navigation", params)
except Exception as e:
logger.error(f"Failed to call pose_navigation: {str(e)}", exc_info=True)
raise
async def pause_navigation(self):
"""
暂停导航。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("nav", "pause_navigation", params)
except Exception as e:
logger.error(f"Failed to call pause_navigation: {str(e)}", exc_info=True)
raise
async def resume_navigation(self):
"""
恢复导航。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("nav", "resume_navigation", params)
except Exception as e:
logger.error(f"Failed to call resume_navigation: {str(e)}", exc_info=True)
raise
# 新增充电相关命令
async def start_recharge(self):
"""
开始充电。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("recharge", "start", params)
except Exception as e:
logger.error(f"Failed to call start_recharge: {str(e)}", exc_info=True)
raise
async def stop_recharge(self):
"""
停止充电。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("recharge", "stop", params)
except Exception as e:
logger.error(f"Failed to call stop_recharge: {str(e)}", exc_info=True)
raise
# 新增服务管理相关命令
async def list_services(self, status_filter: Optional[int] = None):
"""
列出服务。
Args:
- status_filter (int, optional): 状态过滤器
Returns:
str: 命令执行结果消息
"""
try:
params = {}
if status_filter is not None:
params["status"] = status_filter
return await self.pubCmd("rsc", "list", params)
except Exception as e:
logger.error(f"Failed to call list_services: {str(e)}", exc_info=True)
raise
async def switch_service(self, service_name: str, enable: bool):
"""
切换服务状态。
Args:
- service_name (str): 服务名称
- enable (bool): 是否启用
Returns:
str: 命令执行结果消息
"""
if service_name:
try:
params = {"name": service_name, "enable": enable}
return await self.pubCmd("rsc", "switch", params)
except Exception as e:
logger.error(f"Failed to call switch_service: {str(e)}", exc_info=True)
raise
async def set_report_freq(self, interval: int, duration: int):
"""
设置报告频率。
Args:
- interval (int): 间隔时间
- duration (int): 持续时间
Returns:
str: 命令执行结果消息
"""
try:
params = {"interval": interval, "duration": duration}
return await self.pubCmd("rsc", "freq", params)
except Exception as e:
logger.error(f"Failed to call set_report_freq: {str(e)}", exc_info=True)
raise
# 新增带参数的运动命令
async def euler(self, roll: float, pitch: float, yaw: float):
"""
欧拉角控制。
Args:
- roll (float): 横滚角
- pitch (float): 俯仰角
- yaw (float): 偏航角
Returns:
str: 命令执行结果消息
"""
try:
params = {"roll": roll, "pitch": pitch, "yaw": yaw}
return await self.pubCmd("sport", "Euler", params)
except Exception as e:
logger.error(f"Failed to call euler: {str(e)}", exc_info=True)
raise
async def move(self, vx: float, vy: float, vyaw: float):
"""
移动控制。
Args:
- vx (float): x方向速度
- vy (float): y方向速度
- vyaw (float): 角速度
Returns:
str: 命令执行结果消息
"""
try:
params = {"vx": vx, "vy": vy, "vyaw": vyaw}
return await self.pubCmd("sport", "Move", params)
except Exception as e:
logger.error(f"Failed to call move: {str(e)}", exc_info=True)
raise
async def set_speed_level(self, level: int):
"""
设置速度等级。
Args:
- level (int): 速度等级
Returns:
str: 命令执行结果消息
"""
try:
params = {"level": level}
return await self.pubCmd("sport", "SpeedLevel", params)
except Exception as e:
logger.error(f"Failed to call set_speed_level: {str(e)}", exc_info=True)
raise
async def pose(self, flag: bool):
"""
姿势控制。
Args:
- flag (bool): 姿势标志
Returns:
str: 命令执行结果消息
"""
try:
params = {"flag": flag}
return await self.pubCmd("sport", "Pose", params)
except Exception as e:
logger.error(f"Failed to call pose: {str(e)}", exc_info=True)
raise
async def serve() -> None:
server = Server("terminal_go2_mcp")
mmhandler = get_mcpmqtt_handler()
mcp_server = TerminalGo2McpServer(mmhandler)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有工具"""
return [
# Tool(
# name="nav_to",
# description="机器人导航去某个地方。",
# inputSchema={
# "type": "object",
# "properties": {
# "location": {
# "type": "string",
# "description": "目标地点名称",
# "minLength": 1
# }
# },
# "required": ["location"]
# }
# ),
# Tool(
# name="speak",
# description="机器人说某些话。",
# inputSchema={
# "type": "object",
# "properties": {
# "speech": {
# "type": "string",
# "description": "说话内容",
# "minLength": 1
# }
# },
# "required": ["speech"]
# }
# ),
Tool(
name="switch_mode",
description="切换机器人模式。运控模式:ai、normal、advanced",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "模式名称",
"minLength": 1
}
},
"required": ["name"]
}
),
Tool(
name="custom_action",
description="机器人自定义动作展示。支持多种动作包括:基础动作(Damp阻尼、BalanceStand平衡站立、StandUp站起、StandDown蹲下、Sit坐下、RiseSit站起(相对于坐下)、RecoveryStand恢复站立)、表演动作(Hello打招呼、Stretch伸展、Content满足、Heart爱心、Scrape刮擦)、高难度动作(FrontFlip前空翻、BackFlip后空翻、LeftFlip左空翻、FrontJump前跳、FrontPounce前扑、Dance1舞蹈1、Dance2舞蹈2)、自由模式(FreeWalk自由行走、FreeBound自由跳跃、FreeJump自由跳跃、FreeAvoid自由避障、WalkUpright直立行走、CrossStep交叉步)等。",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"description": "动作名称",
"minLength": 1
}
},
"required": ["action"]
}
),
Tool(
name="StartContinuousMove",
description="开始连续移动。",
inputSchema={
"type": "object",
"properties": {
"vx": {
"type": "number",
"description": "x方向速度"
},
"vy": {
"type": "number",
"description": "y方向速度"
},
"vyaw": {
"type": "number",
"description": "偏航角速度"
}
},
"required": ["vx", "vy", "vyaw"]
}
),
Tool(
name="StopContinuousMove",
description="停止连续移动。",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="obstacle_avoidance_switch",
description="开启/关闭避障功能。",
inputSchema={
"type": "object",
"properties": {
"enable": {
"type": "boolean",
"description": "是否开启避障"
}
},
"required": ["enable"]
}
),
Tool(
name="obstacle_move",
description="避障移动。",
inputSchema={
"type": "object",
"properties": {
"x": {
"type": "number",
"description": "x坐标"
},
"y": {
"type": "number",
"description": "y坐标"
},
"yaw": {
"type": "number",
"description": "偏航角"
}
},
"required": ["x", "y", "yaw"]
}
),
# Tool(
# name="move_to_absolute_position",
# description="移动到绝对位置。",
# inputSchema={
# "type": "object",
# "properties": {
# "x": {
# "type": "number",
# "description": "x坐标"
# },
# "y": {
# "type": "number",
# "description": "y坐标"
# },
# "yaw": {
# "type": "number",
# "description": "偏航角"
# }
# },
# "required": ["x", "y", "yaw"]
# }
# ),
# Tool(
# name="move_to_increment_position",
# description="移动到相对位置。",
# inputSchema={
# "type": "object",
# "properties": {
# "x": {
# "type": "number",
# "description": "x坐标增量"
# },
# "y": {
# "type": "number",
# "description": "y坐标增量"
# },
# "yaw": {
# "type": "number",
# "description": "偏航角增量"
# }
# },
# "required": ["x", "y", "yaw"]
# }
# ),
# Tool(
# name="start_mapping",
# description="开始建图。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
# Tool(
# name="end_mapping",
# description="结束建图并保存地图。",
# inputSchema={
# "type": "object",
# "properties": {
# "address": {
# "type": "string",
# "description": "地图保存地址",
# "minLength": 1
# }
# },
# "required": ["address"]
# }
# ),
# Tool(
# name="initialize_pose",
# description="初始化机器人位姿。",
# inputSchema={
# "type": "object",
# "properties": {
# "x": {"type": "number", "description": "x坐标"},
# "y": {"type": "number", "description": "y坐标"},
# "z": {"type": "number", "description": "z坐标"},
# "q_x": {"type": "number", "description": "四元数x"},
# "q_y": {"type": "number", "description": "四元数y"},
# "q_z": {"type": "number", "description": "四元数z"},
# "q_w": {"type": "number", "description": "四元数w"},
# "address": {
# "type": "string",
# "description": "地图地址",
# "minLength": 1
# }
# },
# "required": ["x", "y", "z", "q_x", "q_y", "q_z", "q_w", "address"]
# }
# ),
# Tool(
# name="pose_navigation",
# description="位姿导航。",
# inputSchema={
# "type": "object",
# "properties": {
# "target_x": {"type": "number", "description": "目标x坐标"},
# "target_y": {"type": "number", "description": "目标y坐标"},
# "target_z": {"type": "number", "description": "目标z坐标"},
# "target_q_x": {"type": "number", "description": "目标四元数x"},
# "target_q_y": {"type": "number", "description": "目标四元数y"},
# "target_q_z": {"type": "number", "description": "目标四元数z"},
# "target_q_w": {"type": "number", "description": "目标四元数w"},
# "mode": {"type": "integer", "description": "导航模式"},
# "speed": {"type": "number", "description": "导航速度"}
# },
# "required": ["target_x", "target_y", "target_z",
# "target_q_x", "target_q_y", "target_q_z", "target_q_w", "mode", "speed"]
# }
# ),
# Tool(
# name="pause_navigation",
# description="暂停导航。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
# Tool(
# name="resume_navigation",
# description="恢复导航。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
# Tool(
# name="start_recharge",
# description="开始充电。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
# Tool(
# name="stop_recharge",
# description="停止充电。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
Tool(
name="list_services",
description="列出服务。",
inputSchema={
"type": "object",
"properties": {
"status_filter": {
"type": "integer",
"description": "状态过滤器(可选)"
}
},
"required": []
}
),
Tool(
name="switch_service",
description="切换服务状态。",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "服务名称",
"minLength": 1
},
"enable": {
"type": "boolean",
"description": "是否启用"
}
},
"required": ["service_name", "enable"]
}
),
Tool(
name="set_report_freq",
description="设置报告频率。",
inputSchema={
"type": "object",
"properties": {
"interval": {
"type": "integer",
"description": "间隔时间"
},
"duration": {
"type": "integer",
"description": "持续时间"
}
},
"required": ["interval", "duration"]
}
),
Tool(
name="euler",
description="欧拉角控制,即站立和行走时的姿态。(roll: 取值范围 [-0.75~0.75] (rad); pitch: 取值范围 [-0.75~0.75] (rad); yaw: 取值范围 [-0.6~0.6] (rad))",
inputSchema={
"type": "object",
"properties": {
"roll": {
"type": "number",
"description": "横滚角"
},
"pitch": {
"type": "number",
"description": "俯仰角"
},
"yaw": {
"type": "number",
"description": "偏航角"
}
},
"required": ["roll", "pitch", "yaw"]
}
),
Tool(
name="move",
description="移动控制。vx: 取值范围[-2.5~3.8] (m/s); vy: 取值范围[-1.0~1.0] (m/s); vyaw: 取值范围[-4~4] (rad/s)",
inputSchema={
"type": "object",
"properties": {
"vx": {
"type": "number",
"description": "x方向速度"
},
"vy": {
"type": "number",
"description": "y方向速度"
},
"vyaw": {
"type": "number",
"description": "角速度"
}
},
"required": ["vx", "vy", "vyaw"]
}
),
Tool(
name="set_speed_level",
description="设置速度等级。level: 取值范围[-1、0、1] (慢速,正常,快速)",
inputSchema={
"type": "object",
"properties": {
"level": {
"type": "integer",
"description": "速度等级(-1为慢速,0为正常,1为快速)",
"enum": [-1, 0, 1]
}
},
"required": ["level"]
}
),
Tool(
name="pose",
description="姿势控制。",
inputSchema={
"type": "object",
"properties": {
"flag": {
"type": "boolean",
"description": "姿势标志(设置true为摆姿式,false为恢复)"
}
},
"required": ["flag"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
"""处理工具调用"""
try:
if name == "switch_mode":
result = await mcp_server.switch_mode(arguments["name"])
elif name == "custom_action":
result = await mcp_server.custom_action(arguments["action"])
elif name == "StartContinuousMove":
result = await mcp_server.StartContinuousMove(arguments["vx"], arguments["vy"], arguments["vyaw"])
elif name == "StopContinuousMove":
result = await mcp_server.StopContinuousMove()
elif name == "save_position":
result = await mcp_server.save_position(arguments["location_name"])
elif name == "obstacle_avoidance_switch":
result = await mcp_server.obstacle_avoidance_switch(arguments["enable"])
elif name == "obstacle_move":
result = await mcp_server.obstacle_move(arguments["x"], arguments["y"], arguments["yaw"])
# elif name == "move_to_absolute_position":
# result = await mcp_server.move_to_absolute_position(arguments["x"], arguments["y"], arguments["yaw"])
# elif name == "move_to_increment_position":
# result = await mcp_server.move_to_increment_position(arguments["x"], arguments["y"], arguments["yaw"])
# elif name == "start_mapping":
# result = await mcp_server.start_mapping()
# elif name == "end_mapping":
# result = await mcp_server.end_mapping(arguments["address"])
# elif name == "initialize_pose":
# result = await mcp_server.initialize_pose(
# arguments["x"], arguments["y"], arguments["z"],
# arguments["q_x"], arguments["q_y"], arguments["q_z"], arguments["q_w"],
# arguments["address"]
# )
# elif name == "pose_navigation":
# result = await mcp_server.pose_navigation(
# arguments["target_x"], arguments["target_y"], arguments["target_z"],
# arguments["target_q_x"], arguments["target_q_y"], arguments["target_q_z"], arguments["target_q_w"],
# arguments["mode"], arguments["speed"]
# )
# elif name == "pause_navigation":
# result = await mcp_server.pause_navigation()
# elif name == "resume_navigation":
# result = await mcp_server.resume_navigation()
# elif name == "start_recharge":
# result = await mcp_server.start_recharge()
# elif name == "stop_recharge":
# result = await mcp_server.stop_recharge()
elif name == "list_services":
result = await mcp_server.list_services(arguments.get("status_filter"))
elif name == "switch_service":
result = await mcp_server.switch_service(arguments["service_name"], arguments["enable"])
elif name == "set_report_freq":
result = await mcp_server.set_report_freq(arguments["interval"], arguments["duration"])
elif name == "euler":
result = await mcp_server.euler(arguments["roll"], arguments["pitch"], arguments["yaw"])
elif name == "move":
result = await mcp_server.move(arguments["vx"], arguments["vy"], arguments["vyaw"])
elif name == "set_speed_level":
result = await mcp_server.set_speed_level(arguments["level"])
elif name == "pose":
result = await mcp_server.pose(arguments["flag"])
else:
raise ValueError(f"unknown tool name: {name}")
return [TextContent(text=result)]
except Exception as e:
logger.error(f"Failed to call tool {name}: {str(e)}", exc_info=True)
return [TextContent(text=f"Failed to call tool {name}: {str(e)}")]
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
def main():
asyncio.run(serve())
if __name__ == "__main__":
main()