From ff39bdbd8ad626d376ca5c97151b91e2c25943ae Mon Sep 17 00:00:00 2001 From: Sucan126 <632190820@qq.com> Date: Thu, 9 Oct 2025 19:33:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=9C=BA=E5=99=A8?= =?UTF-8?q?=E4=BA=BA=E6=A8=A1=E5=BC=8F=E5=88=87=E6=8D=A2=E5=92=8C=E8=BF=9E?= =?UTF-8?q?=E7=BB=AD=E7=A7=BB=E5=8A=A8=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增switch_mode方法用于切换机器人运控模式 - 添加ContinuousMove方法实现机器人连续移动控制 - 更新工具调用处理以支持新功能 - 调整版本号至0.0.9 --- terminal_go2_mcp/pyproject.toml | 2 +- terminal_go2_mcp/terminal_go2_mcp/main.py | 209 ++++++++++++++-------- 2 files changed, 134 insertions(+), 77 deletions(-) diff --git a/terminal_go2_mcp/pyproject.toml b/terminal_go2_mcp/pyproject.toml index a92f88a..f91490b 100644 --- a/terminal_go2_mcp/pyproject.toml +++ b/terminal_go2_mcp/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "terminal_go2_mcp" -version = "0.0.5" +version = "0.0.9" description = "MQTT-based navigation server for robot" requires-python = ">=3.10" dependencies = [ diff --git a/terminal_go2_mcp/terminal_go2_mcp/main.py b/terminal_go2_mcp/terminal_go2_mcp/main.py index 0ae6d53..642fd18 100644 --- a/terminal_go2_mcp/terminal_go2_mcp/main.py +++ b/terminal_go2_mcp/terminal_go2_mcp/main.py @@ -35,6 +35,25 @@ class TerminalGo2McpServer: logger.error(f"Failed to publish command: {str(e)}", exc_info=True) return f"Failed to publish command: {str(e)}" + async def switch_mode(self, name: str): + """ + 切换机器人模式。 + Args: + - name (str): 模式名称 + Returns: + str: 命令执行结果消息 + Raises: + ValueError: 如果参数无效或为空 + Exception: 如果MQTT发布失败 + """ + if name: + try: + params = {"name": name} + return await self.pubCmd("msc", "SelectMode", params) + except Exception as e: + logger.error(f"Failed to call switch_mode mcp-tool: {str(e)} ", exc_info=True) + raise + async def custom_action(self, action: str): """ 机器人自定义动作展示。 @@ -53,25 +72,27 @@ class TerminalGo2McpServer: except Exception as e: logger.error(f"Failed to call action mcp-tool: {str(e)} ", exc_info=True) raise - - async def save_position(self, location_name: str): + + async def ContinuousMove(self, on: bool, vx: float, vy: float, vyaw: float): """ - 机器人保存具体地点。 + 开始连续移动。 Args: - - location_name (str): 地点名称 + - on (bool): 是否开始连续移动 + - vx (float): x方向速度 + - vy (float): y方向速度 + - vyaw (float): 偏航角速度 Returns: str: 命令执行结果消息 Raises: ValueError: 如果参数无效或为空 Exception: 如果MQTT发布失败 """ - if location_name: - try: - params = {"location_name": location_name} - return await self.pubCmd("nav", "save_location", params) - except Exception as e: - logger.error(f"Failed to call saveLocation mcp-tool: {str(e)} ", exc_info=True) - raise + try: + params = {"on": on, "vx": vx, "vy": vy, "vyaw": vyaw} + return await self.pubCmd("sport", "ContinuousMove", params) + except Exception as e: + logger.error(f"Failed to call ContinuousMove: {str(e)}", exc_info=True) + raise # 新增OAC相关命令 async def obstacle_avoidance_switch(self, enable: bool): @@ -428,9 +449,24 @@ async def serve() -> None: # "required": ["speech"] # } # ), + Tool( + name="switch_mode", + description="切换机器人模式。运控模式:ai、normal、advanced", + inputSchema={ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "模式名称", + "minLength": 1 + } + }, + "required": ["name"] + } + ), Tool( name="custom_action", - description="机器人自定义动作展示。支持多种动作包括:基础动作(Damp阻尼、BalanceStand平衡站立、StandUp站起、StandDown蹲下、Sit坐下、RecoveryStand恢复站立)、表演动作(Hello打招呼、Stretch伸展、Content满足、Heart爱心、Scrape刮擦)、高难度动作(FrontFlip前空翻、BackFlip后空翻、LeftFlip左空翻、FrontJump前跳、FrontPounce前扑、Dance1舞蹈1、Dance2舞蹈2)、自由模式(FreeWalk自由行走、FreeBound自由跳跃、FreeJump自由跳跃、FreeAvoid自由避障、WalkUpright直立行走、CrossStep交叉步)等。", + description="机器人自定义动作展示。支持多种动作包括:基础动作(Damp阻尼、BalanceStand平衡站立、StandUp站起、StandDown蹲下、Sit坐下、RiseSit站起(相对于坐下)、RecoveryStand恢复站立)、表演动作(Hello打招呼、Stretch伸展、Content满足、Heart爱心、Scrape刮擦)、高难度动作(FrontFlip前空翻、BackFlip后空翻、LeftFlip左空翻、FrontJump前跳、FrontPounce前扑、Dance1舞蹈1、Dance2舞蹈2)、自由模式(FreeWalk自由行走、FreeBound自由跳跃、FreeJump自由跳跃、FreeAvoid自由避障、WalkUpright直立行走、CrossStep交叉步)等。", inputSchema={ "type": "object", "properties": { @@ -443,21 +479,36 @@ async def serve() -> None: "required": ["action"] } ), - # Tool( - # name="save_position", - # description="机器人保存具体地点。", - # inputSchema={ - # "type": "object", - # "properties": { - # "location_name": { - # "type": "string", - # "description": "地点名称", - # "minLength": 1 - # } - # }, - # "required": ["location_name"] - # } - # ), + Tool( + name="StartContinuousMove", + description="开始连续移动。", + inputSchema={ + "type": "object", + "properties": { + "vx": { + "type": "number", + "description": "x方向速度" + }, + "vy": { + "type": "number", + "description": "y方向速度" + }, + "vyaw": { + "type": "number", + "description": "偏航角速度" + } + }, + "required": ["vx", "vy", "vyaw"] + } + ), + Tool( + name="StopContinuousMove", + description="停止连续移动。", + inputSchema={ + "type": "object", + "properties": {} + } + ), Tool( name="obstacle_avoidance_switch", description="开启/关闭避障功能。", @@ -494,50 +545,50 @@ async def serve() -> None: "required": ["x", "y", "yaw"] } ), - Tool( - name="move_to_absolute_position", - description="移动到绝对位置。", - inputSchema={ - "type": "object", - "properties": { - "x": { - "type": "number", - "description": "x坐标" - }, - "y": { - "type": "number", - "description": "y坐标" - }, - "yaw": { - "type": "number", - "description": "偏航角" - } - }, - "required": ["x", "y", "yaw"] - } - ), - Tool( - name="move_to_increment_position", - description="移动到相对位置。", - inputSchema={ - "type": "object", - "properties": { - "x": { - "type": "number", - "description": "x坐标增量" - }, - "y": { - "type": "number", - "description": "y坐标增量" - }, - "yaw": { - "type": "number", - "description": "偏航角增量" - } - }, - "required": ["x", "y", "yaw"] - } - ), + # Tool( + # name="move_to_absolute_position", + # description="移动到绝对位置。", + # inputSchema={ + # "type": "object", + # "properties": { + # "x": { + # "type": "number", + # "description": "x坐标" + # }, + # "y": { + # "type": "number", + # "description": "y坐标" + # }, + # "yaw": { + # "type": "number", + # "description": "偏航角" + # } + # }, + # "required": ["x", "y", "yaw"] + # } + # ), + # Tool( + # name="move_to_increment_position", + # description="移动到相对位置。", + # inputSchema={ + # "type": "object", + # "properties": { + # "x": { + # "type": "number", + # "description": "x坐标增量" + # }, + # "y": { + # "type": "number", + # "description": "y坐标增量" + # }, + # "yaw": { + # "type": "number", + # "description": "偏航角增量" + # } + # }, + # "required": ["x", "y", "yaw"] + # } + # ), # Tool( # name="start_mapping", # description="开始建图。", @@ -770,18 +821,24 @@ async def serve() -> None: async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]: """处理工具调用""" try: - if name == "custom_action": + if name == "switch_mode": + result = await mcp_server.switch_mode(arguments["name"]) + elif name == "custom_action": result = await mcp_server.custom_action(arguments["action"]) + elif name == "StartContinuousMove": + result = await mcp_server.StartContinuousMove(arguments["vx"], arguments["vy"], arguments["vyaw"]) + elif name == "StopContinuousMove": + result = await mcp_server.StopContinuousMove() elif name == "save_position": result = await mcp_server.save_position(arguments["location_name"]) elif name == "obstacle_avoidance_switch": result = await mcp_server.obstacle_avoidance_switch(arguments["enable"]) elif name == "obstacle_move": result = await mcp_server.obstacle_move(arguments["x"], arguments["y"], arguments["yaw"]) - elif name == "move_to_absolute_position": - result = await mcp_server.move_to_absolute_position(arguments["x"], arguments["y"], arguments["yaw"]) - elif name == "move_to_increment_position": - result = await mcp_server.move_to_increment_position(arguments["x"], arguments["y"], arguments["yaw"]) + # elif name == "move_to_absolute_position": + # result = await mcp_server.move_to_absolute_position(arguments["x"], arguments["y"], arguments["yaw"]) + # elif name == "move_to_increment_position": + # result = await mcp_server.move_to_increment_position(arguments["x"], arguments["y"], arguments["yaw"]) # elif name == "start_mapping": # result = await mcp_server.start_mapping() # elif name == "end_mapping":