Files
lzwcai-mcp/temi_mcpserver/temiRobot/temi_mcpserver.py

490 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Optional, Sequence
import logging
from fastapi import FastAPI
from pydantic import BaseModel
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import json
import time
import uuid
from .mcp_mqtt import get_mcpmqtt_handler
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TourStop(BaseModel):
name: str
text: str
class NavServer:
def __init__(self, mmhandler=None):
self.mmhandler = mmhandler or get_mcpmqtt_handler()
async def publish_Cmd(self, device_id: str, cmd: str, params: dict):
try:
payload = {
"device_id": device_id,
"cmd": cmd,
"data": params["data"]
}
self.mmhandler.client.publish("robot/cmd", json.dumps(payload), qos=2)
return f"{cmd}工具执行成功"
except Exception as e:
logger.error(f"Failed to publish {cmd} command: {str(e)}", exc_info=True)
raise
async def nav_to(self, location: str):
"""
机器人导航去某个地方。
Args:
- location (str): 目标地点名称
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if location:
try:
if location == "充电桩":
location = "home base"
params = {
"data": {"location": location}
}
return await self.publish_Cmd("123456", "nav", params)
except Exception as e:
logger.error(f"Failed to call navigation mcp-tool: {str(e)} ", exc_info=True)
raise
async def speak(self, speech: str):
"""
机器人说某些话。
Args:
- speech (str): 说话内容
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if speech:
try:
params = {
"data": {"speech": speech}
}
return await self.publish_Cmd("123456", "speak", params)
except Exception as e:
logger.error(f"Failed to call speak mcp-tool: {str(e)} ", exc_info=True)
raise
async def reception(self, location: str):
"""
机器人接待客人。
Args:
- location (str): 机器人去这个位置接待客人
- task_id (Optional[str]): 任务id不传则自动生成
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if location :
try:
params = {
"data": {
"location": location
}
}
return await self.publish_Cmd("123456", "reception", params)
except Exception as e:
logger.error(f"Failed to call reception mcp-tool: {str(e)} ", exc_info=True)
raise
async def notification(self, location: str, text: str):
"""
机器人通知事情。
Args:
- location (str): 机器人去这个位置通知事情
- text (str): 通知事情的内容
- task_id (Optional[str]): 任务id不传则自动生成
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if location :
try:
params = {
"data": {
"location": location,
"text": text
}
}
return await self.publish_Cmd("123456", "notification", params)
except Exception as e:
logger.error(f"Failed to call notification mcp-tool: {str(e)} ", exc_info=True)
raise
async def repose(self):
"""
机器人重新定位。
"""
try:
params = {
"data": {
"action": "repose"
}
}
return await self.publish_Cmd("123456", "map", params)
except Exception as e:
logger.error(f"Failed to call repose mcp-tool: {str(e)} ", exc_info=True)
raise
async def delivery(self, first_location: str, next_location: str):
"""
机器人运送物品。
Args:
- first_location (str): 机器人去这个位置取物品
- next_location (str): 机器人将物品送到这个位置
- task_id (Optional[str]): 任务id不传则自动生成
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if first_location and next_location:
try:
params = {
"data": {
"first_location": first_location,
"next_location": next_location
}
}
return await self.publish_Cmd("123456", "delivery", params)
except Exception as e:
logger.error(f"Failed to call delivery mcp-tool: {str(e)} ", exc_info=True)
raise
async def startFaceRecognize(self, device_id: str):
"""
机器人进行人脸识别。
Args:
- device_id (str): 设备id
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if device_id:
try:
params = {
"data": {}
}
return await self.publish_Cmd("123456", "face", params)
except Exception as e:
logger.error(f"Failed to call face recognize mcp-tool: {str(e)} ", exc_info=True)
raise
async def scanQRCode(self, device_id: str):
"""
机器人进行二维码扫描。
Args:
- device_id (str): 设备id
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if device_id:
try:
params = {
"data": {}
}
return await self.publish_Cmd("123456", "qrcode", params)
except Exception as e:
logger.error(f"Failed to call QR code scan mcp-tool: {str(e)} ", exc_info=True)
raise
async def save_position(self, device_id: str, location_name: str):
"""
机器人保存具体地点。
Args:
- device_id (str): 设备id
- location_name (str): 地点名称
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
if device_id and location_name:
try:
params = {
"data": {"location_name": location_name}
}
return await self.publish_Cmd("123456", "saveLocation", params)
except Exception as e:
logger.error(f"Failed to call save_position mcp-tool: {str(e)} ", exc_info=True)
raise
async def guide(self):
"""
机器人导览。
Args:
Returns:
str: 命令执行结果消息
Raises:
ValueError: 如果参数无效或为空
Exception: 如果MQTT发布失败
"""
try:
datas = [
TourStop(name="前台", text="欢迎来到我们的场所,这是入口区域"),
TourStop(name="灵泽万川展厅", text="这是我们的主大厅,提供接待服务"),
TourStop(name="爱易拍展厅", text="这里展示我们的主要产品和成果")
]
params = {
"data": [data.model_dump() for data in datas]
}
return await self.publish_Cmd("123456", "guide", params)
except Exception as e:
logger.error(f"Failed to call guide mcp-tool: {str(e)} ", exc_info=True)
raise
async def serve() -> None:
server = Server("temi_mcp")
mmhandler = get_mcpmqtt_handler()
nav_server = NavServer(mmhandler)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有工具"""
return [
Tool(
name="nav_to",
description="数字员工、终端、具身机器人、前台机器人、助手、机器人导航带路到指定地点:带我去、去XX、请引路、导航去、我想去、能带我去吗、带路、送我去、在哪里、前往XX、去XX位置、带我过去、请带我到、我想前往、去那个地方、我要去、能不能去、带我导航、这个地方怎么走、我想找XX、导航带我去、请带路到、我要去那边、走一趟去、去下XX、到XX怎么走",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "目标地点名称",
"minLength": 1
}
},
"required": ["location"]
}
),
Tool(
name="speak",
description="数字员工、终端、具身机器人、前台机器人、助手、机器人进行语音播报:告诉、提醒、告知、提示",
inputSchema={
"type": "object",
"properties": {
"speech": {
"type": "string",
"description": "要播报的语音内容",
"minLength": 1
}
},
"required": ["speech"]
}
),
Tool(
name="reception",
description="数字员工、终端、具身机器人、前台机器人、助手、机器人去接待客人:去接人、请迎接客人、去接待、迎接一下、带人过来、把客人带过来、去门口接人、去接一下、请接人、接客人、帮我迎接、去把人接过来、接待来访者、请去接一下客人、送客人过来、从XX接到YY、引导访客、带访客过来、请引领客人、去门口把人接进来、从入口接人、去接待处接人、带客人进来",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "引导接待客人到这个位置",
"minLength": 1
},
},
"required": ["location"]
}
),
Tool(
name="notification",
description="数字员工、终端、具身机器人、前台机器人、助手、机器人去通知事情:通知、告诉、提醒、告知、提示",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "去这个位置通知事情",
"minLength": 1
},
"text": {
"type": "string",
"description": "通知事情的内容",
"minLength": 1
}
},
"required": ["location","text"]
}
),
Tool(
name="repose",
description="数字员工、终端、具身机器人、前台机器人、助手、机器人去重新定位",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="delivery",
description="数字员工、终端、具身机器人、前台机器人、助手、机器人去运输物品:帮我送东西、去送一下、去送物品、帮我拿过去、送这个过去、去送快递、运送一下、帮我带东西、把东西送过去、物品配送、拿去给XX、去取个东西、取完送过去、帮我捎个东西、把这个拿去、请运过去、送过去给谁、请配送一下、帮我转交、帮我送到、从XX拿到YY、物品搬运",
inputSchema={
"type": "object",
"properties": {
"first_location": {
"type": "string",
"description": "去这个位置取物品",
"minLength": 1
},
"next_location": {
"type": "string",
"description": "将物品运输、送到这个位置",
"minLength": 1
}
},
"required": ["first_location","next_location"]
}
),
# Tool(
# name="guide",
# description="数字员工、终端、具身机器人、前台机器人、助手、机器人进行导览服务:导览、介绍、参观、带领参观、导游、讲解、展示、演示、介绍场所、带我看看",
# inputSchema={
# "type": "object",
# "properties": {
# },
# "required": []
# }
# )
# Tool(
# name="face",
# description="数字员工、终端、具身机器人、前台机器人、助手、机器人进行人脸识别:识别人脸、人脸识别、脸部识别、面部识别、认脸、看看是谁、识别一下、看看这个人、辨识人脸、扫脸、人脸检测、看脸识别、脸部检测、识别面孔、看人脸、人脸扫描、脸识别、面孔识别、认人、识别身份、看看这是谁、人脸认证、脸部扫描、面部扫描",
# inputSchema={
# "type": "object",
# "properties": {
# "device_id": {
# "type": "string",
# "description": "设备ID",
# "minLength": 1
# },
# },
# "required": ["device_id"]
# }
# ),
# Tool(
# name="qrcode",
# description="数字员工、终端、具身机器人、前台机器人、助手、机器人进行二维码扫描:扫码、扫二维码、扫QR码、二维码识别、码扫描、扫一下码、读取二维码、识别二维码、二维码扫描、QR扫描、扫描条码、扫描编码、读码、解析二维码、扫描识别、码识别、条码扫描、扫描二维码、读取码、扫描条形码、二维码读取、码解析",
# inputSchema={
# "type": "object",
# "properties": {
# "device_id": {
# "type": "string",
# "description": "设备ID",
# "minLength": 1
# },
# "task_id": {
# "type": "string",
# "description": "任务ID"
# }
# },
# "required": ["device_id"]
# }
# )
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
"""处理工具调用"""
try:
result = ""
if name == "nav_to":
if "location" not in arguments:
raise ValueError("缺少必要参数: location")
result = await nav_server.nav_to(
location=arguments["location"]
)
elif name == "speak":
if "speech" not in arguments:
raise ValueError("缺少必要参数: speech")
result = await nav_server.speak(
speech=arguments["speech"]
)
elif name == "reception":
if "location" not in arguments:
raise ValueError("缺少必要参数: location")
result = await nav_server.reception(
location=arguments["location"],
)
elif name == "notification":
if "location" not in arguments or "text" not in arguments:
raise ValueError("缺少必要参数: location或text")
result = await nav_server.notification(
location=arguments["location"],
text=arguments["text"]
)
elif name == "repose":
result = await nav_server.repose()
elif name == "delivery":
if "first_location" not in arguments or "next_location" not in arguments:
raise ValueError("缺少必要参数: first_location或next_location")
result = await nav_server.delivery(
first_location=arguments["first_location"],
next_location=arguments["next_location"]
)
# elif name == "face":
# if "device_id" not in arguments:
# raise ValueError("缺少必要参数: device_id")
# result = await nav_server.startFaceRecognize(
# device_id=arguments["device_id"],
# )
# elif name == "qrcode":
# if "device_id" not in arguments:
# raise ValueError("缺少必要参数: device_id")
# result = await nav_server.scanQRCode(
# device_id=arguments["device_id"],
# )
else:
raise ValueError(f"未知工具: {name}")
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"工具调用失败: {str(e)}")
raise ValueError(f"执行失败: {str(e)}")
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
def main():
asyncio.run(serve())
if __name__ == "__main__":
main()