feat: 添加机器人模式切换和连续移动功能
- 新增switch_mode方法用于切换机器人运控模式 - 添加ContinuousMove方法实现机器人连续移动控制 - 更新工具调用处理以支持新功能 - 调整版本号至0.0.9
This commit is contained in:
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "terminal_go2_mcp"
|
||||
version = "0.0.5"
|
||||
version = "0.0.9"
|
||||
description = "MQTT-based navigation server for robot"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
|
||||
@@ -35,6 +35,25 @@ class TerminalGo2McpServer:
|
||||
logger.error(f"Failed to publish command: {str(e)}", exc_info=True)
|
||||
return f"Failed to publish command: {str(e)}"
|
||||
|
||||
async def switch_mode(self, name: str):
|
||||
"""
|
||||
切换机器人模式。
|
||||
Args:
|
||||
- name (str): 模式名称
|
||||
Returns:
|
||||
str: 命令执行结果消息
|
||||
Raises:
|
||||
ValueError: 如果参数无效或为空
|
||||
Exception: 如果MQTT发布失败
|
||||
"""
|
||||
if name:
|
||||
try:
|
||||
params = {"name": name}
|
||||
return await self.pubCmd("msc", "SelectMode", params)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to call switch_mode mcp-tool: {str(e)} ", exc_info=True)
|
||||
raise
|
||||
|
||||
async def custom_action(self, action: str):
|
||||
"""
|
||||
机器人自定义动作展示。
|
||||
@@ -54,23 +73,25 @@ class TerminalGo2McpServer:
|
||||
logger.error(f"Failed to call action mcp-tool: {str(e)} ", exc_info=True)
|
||||
raise
|
||||
|
||||
async def save_position(self, location_name: str):
|
||||
async def ContinuousMove(self, on: bool, vx: float, vy: float, vyaw: float):
|
||||
"""
|
||||
机器人保存具体地点。
|
||||
开始连续移动。
|
||||
Args:
|
||||
- location_name (str): 地点名称
|
||||
- on (bool): 是否开始连续移动
|
||||
- vx (float): x方向速度
|
||||
- vy (float): y方向速度
|
||||
- vyaw (float): 偏航角速度
|
||||
Returns:
|
||||
str: 命令执行结果消息
|
||||
Raises:
|
||||
ValueError: 如果参数无效或为空
|
||||
Exception: 如果MQTT发布失败
|
||||
"""
|
||||
if location_name:
|
||||
try:
|
||||
params = {"location_name": location_name}
|
||||
return await self.pubCmd("nav", "save_location", params)
|
||||
params = {"on": on, "vx": vx, "vy": vy, "vyaw": vyaw}
|
||||
return await self.pubCmd("sport", "ContinuousMove", params)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to call saveLocation mcp-tool: {str(e)} ", exc_info=True)
|
||||
logger.error(f"Failed to call ContinuousMove: {str(e)}", exc_info=True)
|
||||
raise
|
||||
|
||||
# 新增OAC相关命令
|
||||
@@ -428,9 +449,24 @@ async def serve() -> None:
|
||||
# "required": ["speech"]
|
||||
# }
|
||||
# ),
|
||||
Tool(
|
||||
name="switch_mode",
|
||||
description="切换机器人模式。运控模式:ai、normal、advanced",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "模式名称",
|
||||
"minLength": 1
|
||||
}
|
||||
},
|
||||
"required": ["name"]
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="custom_action",
|
||||
description="机器人自定义动作展示。支持多种动作包括:基础动作(Damp阻尼、BalanceStand平衡站立、StandUp站起、StandDown蹲下、Sit坐下、RecoveryStand恢复站立)、表演动作(Hello打招呼、Stretch伸展、Content满足、Heart爱心、Scrape刮擦)、高难度动作(FrontFlip前空翻、BackFlip后空翻、LeftFlip左空翻、FrontJump前跳、FrontPounce前扑、Dance1舞蹈1、Dance2舞蹈2)、自由模式(FreeWalk自由行走、FreeBound自由跳跃、FreeJump自由跳跃、FreeAvoid自由避障、WalkUpright直立行走、CrossStep交叉步)等。",
|
||||
description="机器人自定义动作展示。支持多种动作包括:基础动作(Damp阻尼、BalanceStand平衡站立、StandUp站起、StandDown蹲下、Sit坐下、RiseSit站起(相对于坐下)、RecoveryStand恢复站立)、表演动作(Hello打招呼、Stretch伸展、Content满足、Heart爱心、Scrape刮擦)、高难度动作(FrontFlip前空翻、BackFlip后空翻、LeftFlip左空翻、FrontJump前跳、FrontPounce前扑、Dance1舞蹈1、Dance2舞蹈2)、自由模式(FreeWalk自由行走、FreeBound自由跳跃、FreeJump自由跳跃、FreeAvoid自由避障、WalkUpright直立行走、CrossStep交叉步)等。",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -443,21 +479,36 @@ async def serve() -> None:
|
||||
"required": ["action"]
|
||||
}
|
||||
),
|
||||
# Tool(
|
||||
# name="save_position",
|
||||
# description="机器人保存具体地点。",
|
||||
# inputSchema={
|
||||
# "type": "object",
|
||||
# "properties": {
|
||||
# "location_name": {
|
||||
# "type": "string",
|
||||
# "description": "地点名称",
|
||||
# "minLength": 1
|
||||
# }
|
||||
# },
|
||||
# "required": ["location_name"]
|
||||
# }
|
||||
# ),
|
||||
Tool(
|
||||
name="StartContinuousMove",
|
||||
description="开始连续移动。",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"vx": {
|
||||
"type": "number",
|
||||
"description": "x方向速度"
|
||||
},
|
||||
"vy": {
|
||||
"type": "number",
|
||||
"description": "y方向速度"
|
||||
},
|
||||
"vyaw": {
|
||||
"type": "number",
|
||||
"description": "偏航角速度"
|
||||
}
|
||||
},
|
||||
"required": ["vx", "vy", "vyaw"]
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="StopContinuousMove",
|
||||
description="停止连续移动。",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {}
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="obstacle_avoidance_switch",
|
||||
description="开启/关闭避障功能。",
|
||||
@@ -494,50 +545,50 @@ async def serve() -> None:
|
||||
"required": ["x", "y", "yaw"]
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="move_to_absolute_position",
|
||||
description="移动到绝对位置。",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"x": {
|
||||
"type": "number",
|
||||
"description": "x坐标"
|
||||
},
|
||||
"y": {
|
||||
"type": "number",
|
||||
"description": "y坐标"
|
||||
},
|
||||
"yaw": {
|
||||
"type": "number",
|
||||
"description": "偏航角"
|
||||
}
|
||||
},
|
||||
"required": ["x", "y", "yaw"]
|
||||
}
|
||||
),
|
||||
Tool(
|
||||
name="move_to_increment_position",
|
||||
description="移动到相对位置。",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"x": {
|
||||
"type": "number",
|
||||
"description": "x坐标增量"
|
||||
},
|
||||
"y": {
|
||||
"type": "number",
|
||||
"description": "y坐标增量"
|
||||
},
|
||||
"yaw": {
|
||||
"type": "number",
|
||||
"description": "偏航角增量"
|
||||
}
|
||||
},
|
||||
"required": ["x", "y", "yaw"]
|
||||
}
|
||||
),
|
||||
# Tool(
|
||||
# name="move_to_absolute_position",
|
||||
# description="移动到绝对位置。",
|
||||
# inputSchema={
|
||||
# "type": "object",
|
||||
# "properties": {
|
||||
# "x": {
|
||||
# "type": "number",
|
||||
# "description": "x坐标"
|
||||
# },
|
||||
# "y": {
|
||||
# "type": "number",
|
||||
# "description": "y坐标"
|
||||
# },
|
||||
# "yaw": {
|
||||
# "type": "number",
|
||||
# "description": "偏航角"
|
||||
# }
|
||||
# },
|
||||
# "required": ["x", "y", "yaw"]
|
||||
# }
|
||||
# ),
|
||||
# Tool(
|
||||
# name="move_to_increment_position",
|
||||
# description="移动到相对位置。",
|
||||
# inputSchema={
|
||||
# "type": "object",
|
||||
# "properties": {
|
||||
# "x": {
|
||||
# "type": "number",
|
||||
# "description": "x坐标增量"
|
||||
# },
|
||||
# "y": {
|
||||
# "type": "number",
|
||||
# "description": "y坐标增量"
|
||||
# },
|
||||
# "yaw": {
|
||||
# "type": "number",
|
||||
# "description": "偏航角增量"
|
||||
# }
|
||||
# },
|
||||
# "required": ["x", "y", "yaw"]
|
||||
# }
|
||||
# ),
|
||||
# Tool(
|
||||
# name="start_mapping",
|
||||
# description="开始建图。",
|
||||
@@ -770,18 +821,24 @@ async def serve() -> None:
|
||||
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
|
||||
"""处理工具调用"""
|
||||
try:
|
||||
if name == "custom_action":
|
||||
if name == "switch_mode":
|
||||
result = await mcp_server.switch_mode(arguments["name"])
|
||||
elif name == "custom_action":
|
||||
result = await mcp_server.custom_action(arguments["action"])
|
||||
elif name == "StartContinuousMove":
|
||||
result = await mcp_server.StartContinuousMove(arguments["vx"], arguments["vy"], arguments["vyaw"])
|
||||
elif name == "StopContinuousMove":
|
||||
result = await mcp_server.StopContinuousMove()
|
||||
elif name == "save_position":
|
||||
result = await mcp_server.save_position(arguments["location_name"])
|
||||
elif name == "obstacle_avoidance_switch":
|
||||
result = await mcp_server.obstacle_avoidance_switch(arguments["enable"])
|
||||
elif name == "obstacle_move":
|
||||
result = await mcp_server.obstacle_move(arguments["x"], arguments["y"], arguments["yaw"])
|
||||
elif name == "move_to_absolute_position":
|
||||
result = await mcp_server.move_to_absolute_position(arguments["x"], arguments["y"], arguments["yaw"])
|
||||
elif name == "move_to_increment_position":
|
||||
result = await mcp_server.move_to_increment_position(arguments["x"], arguments["y"], arguments["yaw"])
|
||||
# elif name == "move_to_absolute_position":
|
||||
# result = await mcp_server.move_to_absolute_position(arguments["x"], arguments["y"], arguments["yaw"])
|
||||
# elif name == "move_to_increment_position":
|
||||
# result = await mcp_server.move_to_increment_position(arguments["x"], arguments["y"], arguments["yaw"])
|
||||
# elif name == "start_mapping":
|
||||
# result = await mcp_server.start_mapping()
|
||||
# elif name == "end_mapping":
|
||||
|
||||
Reference in New Issue
Block a user