feat: 重构MQTT处理类并新增机器人控制功能

- 将MCPMQTT类重构为MQTTHandler,改进MQTT连接配置管理
- 新增从配置中心获取MQTT配置的功能
- 新增多种机器人控制命令包括避障、导航、充电等服务
- 更新pyproject.toml版本号并添加requests依赖
- 优化日志记录和错误处理机制
This commit is contained in:
2025-10-09 16:30:18 +08:00
parent 65792ccd89
commit a0b4b4e1aa
3 changed files with 814 additions and 214 deletions

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "terminal_go2_mcp" name = "terminal_go2_mcp"
version = "0.0.1" version = "0.0.5"
description = "MQTT-based navigation server for robot" description = "MQTT-based navigation server for robot"
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = [ dependencies = [
@@ -14,6 +14,7 @@ dependencies = [
"pydantic>=1.10.0", "pydantic>=1.10.0",
"python-dotenv>=0.21.0", "python-dotenv>=0.21.0",
"mcp[cli]>=1.6.0", "mcp[cli]>=1.6.0",
"requests>=2.25.0"
] ]
[project.scripts] [project.scripts]

View File

@@ -8,7 +8,7 @@ import asyncio
import json import json
import time import time
import uuid import uuid
from mcp_mqtt import get_mqtt_handler from .mcp_mqtt import get_mcpmqtt_handler
# 配置日志 # 配置日志
logging.basicConfig( logging.basicConfig(
@@ -20,76 +20,25 @@ logger = logging.getLogger(__name__)
class TerminalGo2McpServer: class TerminalGo2McpServer:
def __init__(self, mmhandler=None): def __init__(self, mmhandler=None):
self.mmhandler = mmhandler or get_mqtt_handler() self.mmhandler = mmhandler or get_mcpmqtt_handler()
async def pubCmd(self, task_id: str, device_id: str, cmd: str, params: dict): async def pubCmd(self, cmd_type: str, cmd: str, params: dict):
try: try:
payload = { payload = {
"task_id": task_id, "type": cmd_type,
"device_id": device_id,
"cmd": cmd, "cmd": cmd,
"data": params["data"] "param": params
} }
self.mmhandler.client.publish("robot/cmd", json.dumps(payload), qos=2) self.mmhandler.client.publish("unitree/go2/cmd", json.dumps(payload), qos=2)
return "任务发布完成" return f"{cmd}任务已经下达完成"
except Exception as e: except Exception as e:
logger.error(f"Failed to publish {cmd} command: {str(e)}", exc_info=True) logger.error(f"Failed to publish command: {str(e)}", exc_info=True)
raise return f"Failed to publish command: {str(e)}"
async def nav_to(self, task_id: str, device_id: str, location: str): async def custom_action(self, action: str):
"""
机器人导航去某个地方。
Args:
- task_id (str): 任务id
- device_id (str): 设备id
- location (str): 目标地点名称
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if task_id and device_id and location:
try:
params = {
"data": {"location": location}
}
return await self.pubCmd(task_id, device_id, "nav", params)
except Exception as e:
logger.error(f"Failed to call navigation mcp-tool: {str(e)} ", exc_info=True)
raise
async def speak(self, task_id: str, device_id: str, speech: str):
"""
机器人说某些话。
Args:
- task_id (str): 任务id
- device_id (str): 设备id
- speech (str): 说话内容
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if task_id and device_id and speech:
try:
params = {
"data": {"speech": speech}
}
return await self.pubCmd(task_id, device_id, "speak", params)
except Exception as e:
logger.error(f"Failed to call speak mcp-tool: {str(e)} ", exc_info=True)
raise
async def custom_action(self, task_id: str, device_id: str, action: str):
""" """
机器人自定义动作展示。 机器人自定义动作展示。
Args: Args:
- task_id (str): 任务id
- device_id (str): 设备id
- action (str): 动作名称 - action (str): 动作名称
Returns: Returns:
str: 命令执行结果消息 str: 命令执行结果消息
@@ -98,24 +47,17 @@ class TerminalGo2McpServer:
ValueError: 如果参数无效或为空 ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败 Exception: 如果MQTT发布失败
""" """
if task_id and device_id and action: if action:
try: try:
params = { return await self.pubCmd("sport", action, {})
"data": {
"action": action
}
}
return await self.pubCmd(task_id, device_id, "action", params)
except Exception as e: except Exception as e:
logger.error(f"Failed to call action mcp-tool: {str(e)} ", exc_info=True) logger.error(f"Failed to call action mcp-tool: {str(e)} ", exc_info=True)
raise raise
async def save_position(self, task_id: str, device_id: str, location_name: str): async def save_position(self, location_name: str):
""" """
机器人保存具体地点。 机器人保存具体地点。
Args: Args:
- task_id (str): 任务id
- device_id (str): 设备id
- location_name (str): 地点名称 - location_name (str): 地点名称
Returns: Returns:
str: 命令执行结果消息 str: 命令执行结果消息
@@ -123,101 +65,703 @@ class TerminalGo2McpServer:
ValueError: 如果参数无效或为空 ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败 Exception: 如果MQTT发布失败
""" """
if task_id and device_id and location_name: if location_name:
try: try:
params = { params = {"location_name": location_name}
"data": { return await self.pubCmd("nav", "save_location", params)
"location_name": location_name
}
}
return await self.pubCmd(task_id, device_id, "saveLocation", params)
except Exception as e: except Exception as e:
logger.error(f"Failed to call saveLocation mcp-tool: {str(e)} ", exc_info=True) logger.error(f"Failed to call saveLocation mcp-tool: {str(e)} ", exc_info=True)
raise raise
# 新增OAC相关命令
async def obstacle_avoidance_switch(self, enable: bool):
"""
开启/关闭避障功能。
Args:
- enable (bool): 是否开启避障
Returns:
str: 命令执行结果消息
"""
try:
params = {"enable": enable}
return await self.pubCmd("oac", "SwitchSet", params)
except Exception as e:
logger.error(f"Failed to call obstacle_avoidance_switch: {str(e)}", exc_info=True)
raise
async def obstacle_move(self, x: float, y: float, yaw: float):
"""
避障移动。
Args:
- x (float): x坐标
- y (float): y坐标
- yaw (float): 偏航角
Returns:
str: 命令执行结果消息
"""
try:
params = {"x": x, "y": y, "yaw": yaw}
return await self.pubCmd("oac", "Move", params)
except Exception as e:
logger.error(f"Failed to call obstacle_move: {str(e)}", exc_info=True)
raise
async def move_to_absolute_position(self, x: float, y: float, yaw: float):
"""
移动到绝对位置。
Args:
- x (float): x坐标
- y (float): y坐标
- yaw (float): 偏航角
Returns:
str: 命令执行结果消息
"""
try:
params = {"x": x, "y": y, "yaw": yaw}
return await self.pubCmd("oac", "MoveToAbsolutePosition", params)
except Exception as e:
logger.error(f"Failed to call move_to_absolute_position: {str(e)}", exc_info=True)
raise
async def move_to_increment_position(self, x: float, y: float, yaw: float):
"""
移动到相对位置。
Args:
- x (float): x坐标增量
- y (float): y坐标增量
- yaw (float): 偏航角增量
Returns:
str: 命令执行结果消息
"""
try:
params = {"x": x, "y": y, "yaw": yaw}
return await self.pubCmd("oac", "MoveToIncrementPosition", params)
except Exception as e:
logger.error(f"Failed to call move_to_increment_position: {str(e)}", exc_info=True)
raise
# 新增导航相关命令
async def start_mapping(self):
"""
开始建图。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("nav", "start_mapping", params)
except Exception as e:
logger.error(f"Failed to call start_mapping: {str(e)}", exc_info=True)
raise
async def end_mapping(self, address: str):
"""
结束建图并保存地图。
Args:
- address (str): 地图保存地址
Returns:
str: 命令执行结果消息
"""
if address:
try:
params = {"address": address}
return await self.pubCmd("nav", "end_mapping", params)
except Exception as e:
logger.error(f"Failed to call end_mapping: {str(e)}", exc_info=True)
raise
async def initialize_pose(self, x: float, y: float, z: float,
q_x: float, q_y: float, q_z: float, q_w: float, address: str):
"""
初始化机器人位姿。
Args:
- x, y, z (float): 位置坐标
- q_x, q_y, q_z, q_w (float): 四元数姿态
- address (str): 地图地址
Returns:
str: 命令执行结果消息
"""
if address:
try:
params = {
"x": x, "y": y, "z": z,
"q_x": q_x, "q_y": q_y, "q_z": q_z, "q_w": q_w,
"address": address
}
return await self.pubCmd("nav", "initialize_pose", params)
except Exception as e:
logger.error(f"Failed to call initialize_pose: {str(e)}", exc_info=True)
raise
async def pose_navigation(self, target_x: float, target_y: float, target_z: float,
target_q_x: float, target_q_y: float, target_q_z: float, target_q_w: float,
mode: int, speed: float):
"""
位姿导航。
Args:
- target_x, target_y, target_z (float): 目标位置坐标
- target_q_x, target_q_y, target_q_z, target_q_w (float): 目标四元数姿态
- mode (int): 导航模式
- speed (float): 导航速度
Returns:
str: 命令执行结果消息
"""
try:
params = {
"targetPose": {
"x": target_x, "y": target_y, "z": target_z,
"q_x": target_q_x, "q_y": target_q_y, "q_z": target_q_z, "q_w": target_q_w
},
"mode": mode,
"speed": speed
}
return await self.pubCmd("nav", "pose_navigation", params)
except Exception as e:
logger.error(f"Failed to call pose_navigation: {str(e)}", exc_info=True)
raise
async def pause_navigation(self):
"""
暂停导航。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("nav", "pause_navigation", params)
except Exception as e:
logger.error(f"Failed to call pause_navigation: {str(e)}", exc_info=True)
raise
async def resume_navigation(self):
"""
恢复导航。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("nav", "resume_navigation", params)
except Exception as e:
logger.error(f"Failed to call resume_navigation: {str(e)}", exc_info=True)
raise
# 新增充电相关命令
async def start_recharge(self):
"""
开始充电。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("recharge", "start", params)
except Exception as e:
logger.error(f"Failed to call start_recharge: {str(e)}", exc_info=True)
raise
async def stop_recharge(self):
"""
停止充电。
Returns:
str: 命令执行结果消息
"""
try:
params = {}
return await self.pubCmd("recharge", "stop", params)
except Exception as e:
logger.error(f"Failed to call stop_recharge: {str(e)}", exc_info=True)
raise
# 新增服务管理相关命令
async def list_services(self, status_filter: Optional[int] = None):
"""
列出服务。
Args:
- status_filter (int, optional): 状态过滤器
Returns:
str: 命令执行结果消息
"""
try:
params = {}
if status_filter is not None:
params["status"] = status_filter
return await self.pubCmd("rsc", "list", params)
except Exception as e:
logger.error(f"Failed to call list_services: {str(e)}", exc_info=True)
raise
async def switch_service(self, service_name: str, enable: bool):
"""
切换服务状态。
Args:
- service_name (str): 服务名称
- enable (bool): 是否启用
Returns:
str: 命令执行结果消息
"""
if service_name:
try:
params = {"name": service_name, "enable": enable}
return await self.pubCmd("rsc", "switch", params)
except Exception as e:
logger.error(f"Failed to call switch_service: {str(e)}", exc_info=True)
raise
async def set_report_freq(self, interval: int, duration: int):
"""
设置报告频率。
Args:
- interval (int): 间隔时间
- duration (int): 持续时间
Returns:
str: 命令执行结果消息
"""
try:
params = {"interval": interval, "duration": duration}
return await self.pubCmd("rsc", "freq", params)
except Exception as e:
logger.error(f"Failed to call set_report_freq: {str(e)}", exc_info=True)
raise
# 新增带参数的运动命令
async def euler(self, roll: float, pitch: float, yaw: float):
"""
欧拉角控制。
Args:
- roll (float): 横滚角
- pitch (float): 俯仰角
- yaw (float): 偏航角
Returns:
str: 命令执行结果消息
"""
try:
params = {"roll": roll, "pitch": pitch, "yaw": yaw}
return await self.pubCmd("sport", "Euler", params)
except Exception as e:
logger.error(f"Failed to call euler: {str(e)}", exc_info=True)
raise
async def move(self, vx: float, vy: float, vyaw: float):
"""
移动控制。
Args:
- vx (float): x方向速度
- vy (float): y方向速度
- vyaw (float): 角速度
Returns:
str: 命令执行结果消息
"""
try:
params = {"vx": vx, "vy": vy, "vyaw": vyaw}
return await self.pubCmd("sport", "Move", params)
except Exception as e:
logger.error(f"Failed to call move: {str(e)}", exc_info=True)
raise
async def set_speed_level(self, level: int):
"""
设置速度等级。
Args:
- level (int): 速度等级
Returns:
str: 命令执行结果消息
"""
try:
params = {"level": level}
return await self.pubCmd("sport", "SpeedLevel", params)
except Exception as e:
logger.error(f"Failed to call set_speed_level: {str(e)}", exc_info=True)
raise
async def pose(self, flag: bool):
"""
姿势控制。
Args:
- flag (bool): 姿势标志
Returns:
str: 命令执行结果消息
"""
try:
params = {"flag": flag}
return await self.pubCmd("sport", "Pose", params)
except Exception as e:
logger.error(f"Failed to call pose: {str(e)}", exc_info=True)
raise
async def serve() -> None: async def serve() -> None:
server = Server("terminal_go2_mcp") server = Server("terminal_go2_mcp")
mmhandler = get_mqtt_handler() mmhandler = get_mcpmqtt_handler()
mcp_server = TerminalGo2McpServer(mmhandler) mcp_server = TerminalGo2McpServer(mmhandler)
@server.list_tools() @server.list_tools()
async def list_tools() -> list[Tool]: async def list_tools() -> list[Tool]:
"""列出所有工具""" """列出所有工具"""
return [ return [
Tool( # Tool(
name="nav_to", # name="nav_to",
description="机器人导航去某个地方。", # description="机器人导航去某个地方。",
inputSchema={ # inputSchema={
"type": "object", # "type": "object",
"properties": { # "properties": {
"task_id": { # "location": {
"type": "string", # "type": "string",
"description": "任务ID", # "description": "目标地点名称",
"minLength": 1 # "minLength": 1
}, # }
"device_id": { # },
"type": "string", # "required": ["location"]
"description": "设备ID", # }
"minLength": 1 # ),
}, # Tool(
"location": { # name="speak",
"type": "string", # description="机器人说某些话。",
"description": "目标地点名称", # inputSchema={
"minLength": 1 # "type": "object",
} # "properties": {
}, # "speech": {
"required": ["task_id", "device_id", "location"] # "type": "string",
} # "description": "说话内容",
), # "minLength": 1
Tool( # }
name="speak", # },
description="机器人说某些话。", # "required": ["speech"]
inputSchema={ # }
"type": "object", # ),
"properties": {
"task_id": {
"type": "string",
"description": "任务ID",
"minLength": 1
},
"device_id": {
"type": "string",
"description": "设备ID",
"minLength": 1
},
"speech": {
"type": "string",
"description": "说话内容",
"minLength": 1
}
},
"required": ["task_id", "device_id", "speech"]
}
),
Tool( Tool(
name="custom_action", name="custom_action",
description="机器人自定义动作展示。", description="机器人自定义动作展示。支持多种动作包括:基础动作(Damp阻尼、BalanceStand平衡站立、StandUp站起、StandDown蹲下、Sit坐下、RecoveryStand恢复站立)、表演动作(Hello打招呼、Stretch伸展、Content满足、Heart爱心、Scrape刮擦)、高难度动作(FrontFlip前空翻、BackFlip后空翻、LeftFlip左空翻、FrontJump前跳、FrontPounce前扑、Dance1舞蹈1、Dance2舞蹈2)、自由模式(FreeWalk自由行走、FreeBound自由跳跃、FreeJump自由跳跃、FreeAvoid自由避障、WalkUpright直立行走、CrossStep交叉步)等。",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
"task_id": {
"type": "string",
"description": "任务ID",
"minLength": 1
},
"device_id": {
"type": "string",
"description": "设备ID",
"minLength": 1
},
"action": { "action": {
"type": "string", "type": "string",
"description": "动作名称", "description": "动作名称",
"minLength": 1 "minLength": 1
} }
}, },
"required": ["task_id", "device_id", "action"] "required": ["action"]
}
),
# Tool(
# name="save_position",
# description="机器人保存具体地点。",
# inputSchema={
# "type": "object",
# "properties": {
# "location_name": {
# "type": "string",
# "description": "地点名称",
# "minLength": 1
# }
# },
# "required": ["location_name"]
# }
# ),
Tool(
name="obstacle_avoidance_switch",
description="开启/关闭避障功能。",
inputSchema={
"type": "object",
"properties": {
"enable": {
"type": "boolean",
"description": "是否开启避障"
}
},
"required": ["enable"]
}
),
Tool(
name="obstacle_move",
description="避障移动。",
inputSchema={
"type": "object",
"properties": {
"x": {
"type": "number",
"description": "x坐标"
},
"y": {
"type": "number",
"description": "y坐标"
},
"yaw": {
"type": "number",
"description": "偏航角"
}
},
"required": ["x", "y", "yaw"]
}
),
Tool(
name="move_to_absolute_position",
description="移动到绝对位置。",
inputSchema={
"type": "object",
"properties": {
"x": {
"type": "number",
"description": "x坐标"
},
"y": {
"type": "number",
"description": "y坐标"
},
"yaw": {
"type": "number",
"description": "偏航角"
}
},
"required": ["x", "y", "yaw"]
}
),
Tool(
name="move_to_increment_position",
description="移动到相对位置。",
inputSchema={
"type": "object",
"properties": {
"x": {
"type": "number",
"description": "x坐标增量"
},
"y": {
"type": "number",
"description": "y坐标增量"
},
"yaw": {
"type": "number",
"description": "偏航角增量"
}
},
"required": ["x", "y", "yaw"]
}
),
# Tool(
# name="start_mapping",
# description="开始建图。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
# Tool(
# name="end_mapping",
# description="结束建图并保存地图。",
# inputSchema={
# "type": "object",
# "properties": {
# "address": {
# "type": "string",
# "description": "地图保存地址",
# "minLength": 1
# }
# },
# "required": ["address"]
# }
# ),
# Tool(
# name="initialize_pose",
# description="初始化机器人位姿。",
# inputSchema={
# "type": "object",
# "properties": {
# "x": {"type": "number", "description": "x坐标"},
# "y": {"type": "number", "description": "y坐标"},
# "z": {"type": "number", "description": "z坐标"},
# "q_x": {"type": "number", "description": "四元数x"},
# "q_y": {"type": "number", "description": "四元数y"},
# "q_z": {"type": "number", "description": "四元数z"},
# "q_w": {"type": "number", "description": "四元数w"},
# "address": {
# "type": "string",
# "description": "地图地址",
# "minLength": 1
# }
# },
# "required": ["x", "y", "z", "q_x", "q_y", "q_z", "q_w", "address"]
# }
# ),
# Tool(
# name="pose_navigation",
# description="位姿导航。",
# inputSchema={
# "type": "object",
# "properties": {
# "target_x": {"type": "number", "description": "目标x坐标"},
# "target_y": {"type": "number", "description": "目标y坐标"},
# "target_z": {"type": "number", "description": "目标z坐标"},
# "target_q_x": {"type": "number", "description": "目标四元数x"},
# "target_q_y": {"type": "number", "description": "目标四元数y"},
# "target_q_z": {"type": "number", "description": "目标四元数z"},
# "target_q_w": {"type": "number", "description": "目标四元数w"},
# "mode": {"type": "integer", "description": "导航模式"},
# "speed": {"type": "number", "description": "导航速度"}
# },
# "required": ["target_x", "target_y", "target_z",
# "target_q_x", "target_q_y", "target_q_z", "target_q_w", "mode", "speed"]
# }
# ),
# Tool(
# name="pause_navigation",
# description="暂停导航。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
# Tool(
# name="resume_navigation",
# description="恢复导航。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
# Tool(
# name="start_recharge",
# description="开始充电。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
# Tool(
# name="stop_recharge",
# description="停止充电。",
# inputSchema={
# "type": "object",
# "properties": {},
# "required": []
# }
# ),
Tool(
name="list_services",
description="列出服务。",
inputSchema={
"type": "object",
"properties": {
"status_filter": {
"type": "integer",
"description": "状态过滤器(可选)"
}
},
"required": []
}
),
Tool(
name="switch_service",
description="切换服务状态。",
inputSchema={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "服务名称",
"minLength": 1
},
"enable": {
"type": "boolean",
"description": "是否启用"
}
},
"required": ["service_name", "enable"]
}
),
Tool(
name="set_report_freq",
description="设置报告频率。",
inputSchema={
"type": "object",
"properties": {
"interval": {
"type": "integer",
"description": "间隔时间"
},
"duration": {
"type": "integer",
"description": "持续时间"
}
},
"required": ["interval", "duration"]
}
),
Tool(
name="euler",
description="欧拉角控制,即站立和行走时的姿态。(roll: 取值范围 [-0.75~0.75] (rad); pitch: 取值范围 [-0.75~0.75] (rad); yaw: 取值范围 [-0.6~0.6] (rad))",
inputSchema={
"type": "object",
"properties": {
"roll": {
"type": "number",
"description": "横滚角"
},
"pitch": {
"type": "number",
"description": "俯仰角"
},
"yaw": {
"type": "number",
"description": "偏航角"
}
},
"required": ["roll", "pitch", "yaw"]
}
),
Tool(
name="move",
description="移动控制。vx: 取值范围[-2.5~3.8] (m/s); vy: 取值范围[-1.0~1.0] (m/s); vyaw: 取值范围[-4~4] (rad/s)",
inputSchema={
"type": "object",
"properties": {
"vx": {
"type": "number",
"description": "x方向速度"
},
"vy": {
"type": "number",
"description": "y方向速度"
},
"vyaw": {
"type": "number",
"description": "角速度"
}
},
"required": ["vx", "vy", "vyaw"]
}
),
Tool(
name="set_speed_level",
description="设置速度等级。level: 取值范围[-1、0、1] (慢速,正常,快速)",
inputSchema={
"type": "object",
"properties": {
"level": {
"type": "integer",
"description": "速度等级(-1为慢速,0为正常,1为快速)",
"enum": [-1, 0, 1]
}
},
"required": ["level"]
}
),
Tool(
name="pose",
description="姿势控制。",
inputSchema={
"type": "object",
"properties": {
"flag": {
"type": "boolean",
"description": "姿势标志(设置true为摆姿式,false为恢复)"
}
},
"required": ["flag"]
} }
) )
] ]
@@ -226,41 +770,71 @@ async def serve() -> None:
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]: async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
"""处理工具调用""" """处理工具调用"""
try: try:
if name == "nav_to": if name == "custom_action":
if "params" not in arguments: result = await mcp_server.custom_action(arguments["action"])
raise ValueError("缺少必要参数: params") elif name == "save_position":
result = await mcp_server.nav_to( result = await mcp_server.save_position(arguments["location_name"])
task_id=arguments["task_id"], elif name == "obstacle_avoidance_switch":
device_id=arguments["device_id"], result = await mcp_server.obstacle_avoidance_switch(arguments["enable"])
location=arguments["params"]["location"] elif name == "obstacle_move":
) result = await mcp_server.obstacle_move(arguments["x"], arguments["y"], arguments["yaw"])
elif name == "speak": elif name == "move_to_absolute_position":
if "params" not in arguments: result = await mcp_server.move_to_absolute_position(arguments["x"], arguments["y"], arguments["yaw"])
raise ValueError("缺少必要参数: params") elif name == "move_to_increment_position":
result = await mcp_server.speak( result = await mcp_server.move_to_increment_position(arguments["x"], arguments["y"], arguments["yaw"])
task_id=arguments["task_id"], # elif name == "start_mapping":
device_id=arguments["device_id"], # result = await mcp_server.start_mapping()
speech=arguments["params"]["speech"] # elif name == "end_mapping":
) # result = await mcp_server.end_mapping(arguments["address"])
elif name == "custom_action": # elif name == "initialize_pose":
if "params" not in arguments: # result = await mcp_server.initialize_pose(
raise ValueError("缺少必要参数: params") # arguments["x"], arguments["y"], arguments["z"],
result = await mcp_server.custom_action( # arguments["q_x"], arguments["q_y"], arguments["q_z"], arguments["q_w"],
task_id=arguments["task_id"], # arguments["address"]
device_id=arguments["device_id"], # )
action=arguments["params"]["action"] # elif name == "pose_navigation":
) # result = await mcp_server.pose_navigation(
# arguments["target_x"], arguments["target_y"], arguments["target_z"],
# arguments["target_q_x"], arguments["target_q_y"], arguments["target_q_z"], arguments["target_q_w"],
# arguments["mode"], arguments["speed"]
# )
# elif name == "pause_navigation":
# result = await mcp_server.pause_navigation()
# elif name == "resume_navigation":
# result = await mcp_server.resume_navigation()
# elif name == "start_recharge":
# result = await mcp_server.start_recharge()
# elif name == "stop_recharge":
# result = await mcp_server.stop_recharge()
elif name == "list_services":
result = await mcp_server.list_services(arguments.get("status_filter"))
elif name == "switch_service":
result = await mcp_server.switch_service(arguments["service_name"], arguments["enable"])
elif name == "set_report_freq":
result = await mcp_server.set_report_freq(arguments["interval"], arguments["duration"])
elif name == "euler":
result = await mcp_server.euler(arguments["roll"], arguments["pitch"], arguments["yaw"])
elif name == "move":
result = await mcp_server.move(arguments["vx"], arguments["vy"], arguments["vyaw"])
elif name == "set_speed_level":
result = await mcp_server.set_speed_level(arguments["level"])
elif name == "pose":
result = await mcp_server.pose(arguments["flag"])
else: else:
raise ValueError(f"不支持的工具名称: {name}") raise ValueError(f"unknown tool name: {name}")
return [TextContent(text=result)] return [TextContent(text=result)]
except Exception as e: except Exception as e:
logger.error(f"Failed to call tool {name}: {str(e)}", exc_info=True) logger.error(f"Failed to call tool {name}: {str(e)}", exc_info=True)
raise return [TextContent(text=f"Failed to call tool {name}: {str(e)}")]
options = server.create_initialization_options() options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream): async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options) await server.run(read_stream, write_stream, options)
if __name__ == "__main__": def main():
asyncio.run(serve()) asyncio.run(serve())
if __name__ == "__main__":
main()

View File

@@ -1,69 +1,94 @@
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import json import json
import logging import logging
import time
import threading import threading
from typing import Optional, Callable, Dict from typing import Optional
import uuid
import threading
import requests
from os import getenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MQTT_CLIENT_ID = f"MCPMQTT-{uuid.uuid4().hex[:8]}"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') def getConfig_url(array):
logger = logging.getLogger("MCPMQTT") url = "http://lzwcai-demp-corp-manager:8086/system/config/getConfig"
data = [array]
try:
response = requests.post(url, json=data, timeout=5)
response.raise_for_status()
data = response.json()['data']
return data[0]['configValue']
except Exception as e:
print(f"Error fetching config for {array}: {e}")
return None
MQTT_BROKER = '192.168.1.199' class MQTTHandler:
MQTT_PORT = 1883
MQTT_CLIENT_ID = "MCPMQTT"
MQTT_USERNAME = 'lzwc'
MQTT_PASSWORD = 'Lzwc@4187.'
RESPONSE_TOPIC = "unitree_response"
class MCPMQTT:
def __init__(self): def __init__(self):
self.client_ = mqtt.Client(client_id=MQTT_CLIENT_ID) self.client = mqtt.Client()
self.client_.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) self.tasks = {} # {task_id: task_data}
self.client_.on_connect = self._on_connect self._lock = threading.Lock()
self.client_.on_message = self._on_message
self.client_.on_disconnect = self._on_disconnect # 获取MQTT配置提供默认值
self._response_handlers = {} # 存储响应处理器 self.mqtt_username = getenv('MQTT_USERNAME') or 'lzwc'
self._lock = threading.Lock() # 线程锁 self.mqtt_password = getenv('MQTT_PASSWORD') or 'Lzwc@4187.'
self._response_topic = RESPONSE_TOPIC mqtt_broker_raw = getenv('MQTT_BROKER') or 'emqx'
# 移除协议前缀,只保留主机名
self.mqtt_broker = mqtt_broker_raw.replace('tcp://', '').replace('mqtt://', '')
mqtt_port_str = getenv('MQTT_PORT')
self.mqtt_port = int(mqtt_port_str) if mqtt_port_str else 1883
# 记录配置信息
logger.info(f"MQTT配置 - Broker: {self.mqtt_broker}, Port: {self.mqtt_port}, Username: {self.mqtt_username}")
if not getenv('MQTT_BROKER'):
logger.warning("MQTT_BROKER环境变量未设置使用默认值")
if not mqtt_port_str:
logger.warning("MQTT_PORT环境变量未设置使用默认端口1883")
self.client.username_pw_set(self.mqtt_username, self.mqtt_password)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
try: try:
logger.info(f"正在连接 MQTT 代理: {MQTT_BROKER}:{MQTT_PORT}") logger.info(f"正在连接 MQTT 代理: {self.mqtt_broker}:{self.mqtt_port}")
self.client_.connect(MQTT_BROKER, MQTT_PORT, 60) self.client.connect(self.mqtt_broker, self.mqtt_port, 60)
self.client_.loop_start() self.client.loop_start()
except Exception as e: except Exception as e:
logger.error(f"连接 MQTT 失败: {e}") logger.error(f"连接 MQTT 失败: {e}")
def _on_connect(self, client, userdata, flags, rc): def _on_connect(self, client, userdata, flags, rc):
"""MQTT 连接回调""" """MQTT 连接回调"""
if rc == 0: if rc == 0:
logger.info("已成功连接到 MQTT 代理服务器") logger.info("已成功连接到 MQTT 代理服务器")
client.subscribe(self._response_topic, qos=2)
logger.info(f"已订阅响应主题: {self._response_topic}")
else: else:
logger.error(f"连接 MQTT 代理服务器失败,返回码: {rc}") logger.error(f"连接 MQTT 代理服务器失败,返回码: {rc}")
def register_response_handler(self, correlation_id: str, callback: Callable): def get_status(self, task_id: str = None):
"""注册响应消息处理器""" """获取任务状态
Args:
task_id: 可选参数指定任务ID。如果为None则返回所有任务状态
Returns:
如果指定task_id则返回该任务的状态否则返回所有任务状态
"""
with self._lock: with self._lock:
self._response_handlers[correlation_id] = callback if task_id:
return self.tasks.get(task_id, {}).copy()
return {k: v.copy() for k, v in self.tasks.items()}
def _on_message(self, client, userdata, msg): def _on_message(self, client, userdata, msg):
try: try:
payload = json.loads(msg.payload.decode()) payload = json.loads(msg.payload.decode())
topic = msg.topic logger.info(f"[MQTT] topic={msg.topic} msg: {payload}")
logger.info(f"[MQTT] 收到消息 topic={topic}")
# 处理响应消息
if topic == self._response_topic and "correlation_id" in payload:
with self._lock:
handler = self._response_handlers.pop(payload["correlation_id"], None)
if handler:
handler(payload)
return
# 打印常规消息
logger.info(f"[MQTT] 消息内容: {payload}")
# task_id = payload.get('task_id')
# if task_id and task_id in self.tasks:
# with self._lock:
# self.tasks[task_id].update({
# 'status': payload.get('status', 'UNKNOWN'),
# 'description': payload.get('description', '')
# })
except Exception as e: except Exception as e:
logger.error(f"[错误] 处理消息失败: {e}") logger.error(f"[错误] 处理消息失败: {e}")
@@ -78,11 +103,11 @@ class MCPMQTT:
logger.error(f"重新连接 MQTT 失败: {e}") logger.error(f"重新连接 MQTT 失败: {e}")
# 单例模式 # 单例模式
_instance: Optional[MCPMQTT] = None _instance: Optional[MQTTHandler] = None
def get_mqtt_handler() -> MCPMQTT: def get_mcpmqtt_handler() -> MQTTHandler:
"""获取MCPMQTT单例""" """获取单例"""
global _instance global _instance
if _instance is None: if _instance is None:
_instance = MCPMQTT() _instance = MQTTHandler()
return _instance return _instance