Files
lzwcai-mcp/terminal_temi_mcp/terminal_temi_mcp/main.py

499 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Optional, Sequence, List
import logging
from fastapi import FastAPI
from pydantic import BaseModel
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import asyncio
import json
import time
import uuid
from .mcp_mqtt import get_mcpmqtt_handler
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TourStop(BaseModel):
name: str
text: str
class NavServer:
def __init__(self, mmhandler=None):
self.mmhandler = mmhandler or get_mcpmqtt_handler()
async def publish_Cmd(self, device_id: str, user_id: str, cmd: str, params: dict):
try:
payload = {
"device_id": device_id,
"user_id": user_id,
"cmd": cmd,
"data": params["data"]
}
self.mmhandler.client.publish("robot/cmd", json.dumps(payload), qos=2)
return f"{cmd} 任务已经下达完成"
except Exception as e:
logger.error(f"Failed to publish command: {str(e)}", exc_info=True)
return f"Failed to publish command: {str(e)}"
async def nav_to(self, user_id: str, location: str, flag: bool):
"""轮足导航到指定位置"""
try:
if not location:
return "Location is not specified."
else:
params = {
"data": {"location": location, "flag": flag}
}
return await self.publish_Cmd("123456", user_id, "nav", params)
except Exception as e:
logger.error(f"Failed to call navigation mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call navigation mcp-tool: {str(e)}"
async def speak(self, user_id: str, speech: str):
"""轮足机器人语音播报"""
try:
if not speech:
return "Speech content is not specified."
else:
params = {
"data": {"speech": speech}
}
return await self.publish_Cmd("123456", user_id, "speak", params)
except Exception as e:
logger.error(f"Failed to call speak mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call speak mcp-tool: {str(e)}"
async def reception(self, user_id: str, location: str):
"""轮足机器人移动到指定位置迎宾"""
try:
if not location:
return "Location is not specified."
else:
params = {
"data": {
"location": location
}
}
return await self.publish_Cmd("123456", user_id, "reception", params)
except Exception as e:
logger.error(f"Failed to call reception mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call reception mcp-tool: {str(e)}"
async def notification(self, user_id: str, location: str, text: str):
"""轮足机器人移动到指定位置通知"""
try:
if not location or not text:
return "Location or text is not specified."
else:
params = {
"data": {
"location": location,
"text": text
}
}
return await self.publish_Cmd("123456", user_id, "notification", params)
except Exception as e:
logger.error(f"Failed to call notification mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call notification mcp-tool: {str(e)}"
async def repose(self, user_id: str):
"""轮足机器人重新定位"""
try:
params = {
"data": {
"action": "repose"
}
}
return await self.publish_Cmd("123456", user_id, "map", params)
except Exception as e:
logger.error(f"Failed to call repose mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call repose mcp-tool: {str(e)}"
async def delivery(self, user_id: str, first_location: str, next_location: str):
"""轮足机器人配送"""
try:
if not first_location or not next_location:
return "first_location or next_location is not specified."
else:
params = {
"data": {
"first_location": first_location,
"next_location": next_location
}
}
return await self.publish_Cmd("123456", user_id, "delivery", params)
except Exception as e:
logger.error(f"Failed to call delivery mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call delivery mcp-tool: {str(e)}"
async def patrol(self, user_id: str, locations: list):
"""轮足机器人巡逻"""
try:
if not locations:
return "locations is not specified."
else:
params = {
"data": {"locations": locations}
}
return await self.publish_Cmd("123456", user_id, "patrol", params)
except Exception as e:
logger.error(f"Failed to call patrol mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call patrol mcp-tool: {str(e)}"
async def startFaceRecognize(self, device_id: str, user_id: str):
"""人脸识别"""
try:
if not device_id:
return "device_id is not specified."
else:
params = {
"data": {}
}
return await self.publish_Cmd("123456", user_id, "face", params)
except Exception as e:
logger.error(f"Failed to call face recognize mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call face recognize mcp-tool: {str(e)}"
async def scanQRCode(self, device_id: str, user_id: str):
"""扫描二维码"""
try:
if not device_id:
return "device_id is not specified."
else:
params = {
"data": {}
}
return await self.publish_Cmd("123456", user_id, "qrcode", params)
except Exception as e:
logger.error(f"Failed to call QR code scan mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call QR code scan mcp-tool: {str(e)}"
async def save_position(self, user_id: str, location_name: str):
"""保存当前位置"""
try:
if not location_name:
return "location is not specified."
else:
params = {
"data": {"location_name": location_name}
}
return await self.publish_Cmd("123456", user_id, "saveLocation", params)
except Exception as e:
logger.error(f"Failed to call save_position mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call save_position mcp-tool: {str(e)}"
async def guide(self, user_id: str, datas: List[TourStop]):
"""引导"""
try:
if not datas:
return "datas is not specified."
params = {
"data": [data.model_dump() for data in datas]
}
return await self.publish_Cmd("123456", user_id, "guide", params)
except Exception as e:
logger.error(f"Failed to call guide mcp-tool: {str(e)} ", exc_info=True)
return f"Failed to call guide mcp-tool: {str(e)}"
async def serve() -> None:
server = Server("terminal_temi_mcp")
mmhandler = get_mcpmqtt_handler()
nav_server = NavServer(mmhandler)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""列出所有工具"""
return [
Tool(
name="nav_to",
description="轮足机器人导航到指定地点为用户引路。支持普通引导和通知两种模式。普通引导用于一般带路场景;通知模式下,机器人到达后会播放提示。适用于带路、引路、前往某个位置等场景。触发关键词:带我去、导航、引路、带路、怎么走、在哪里。",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "目标地点名称",
"minLength": 1
},
"flag": {
"type": "boolean",
"description": "是否为通知类型的导航。True为通知False为普通引导默认"
},
"user_id": {
"type": "string",
"description": "用户ID"
}
},
"required": ["location", "user_id"]
}
),
Tool(
name="speak",
description="轮足机器人进行语音播报:告诉、提醒、告知、提示、通知",
inputSchema={
"type": "object",
"properties": {
"speech": {
"type": "string",
"description": "要播报的语音内容",
"minLength": 1
},
"user_id": {
"type": "string",
"description": "用户ID"
}
},
"required": ["speech", "user_id"]
}
),
Tool(
name="reception",
description="轮足机器人去接待客人:去接人、请迎接客人、去接待、迎接一下、带人过来、把客人带过来、去门口接人、去接一下、请接人、接客人、帮我迎接、去把人接过来、接待来访者、请去接一下客人、送客人过来、从XX接到YY、引导访客、带访客过来、请引领客人、去门口把人接进来、从入口接人、去接待处接人、带客人进来",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "引导接待客人到这个位置",
"minLength": 1
},
"user_id": {
"type": "string",
"description": "用户ID"
}
},
"required": ["location", "user_id"]
}
),
# Tool(
# name="notification",
# description="轮足机器人去通知事情:通知、告诉、提醒、告知、提示",
# inputSchema={
# "type": "object",
# "properties": {
# "location": {
# "type": "string",
# "description": "去这个位置通知事情",
# "minLength": 1
# },
# "text": {
# "type": "string",
# "description": "通知事情的内容",
# "minLength": 1
# },
# "user_id": {
# "type": "string",
# "description": "用户ID"
# }
# },
# "required": ["location","text", "user_id"]
# }
# ),
Tool(
name="repose",
description="轮足机器人、助手、机器人去重新定位",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "用户ID"
}
},
"required": ["user_id"]
}
),
Tool(
name="delivery",
description="轮足机器人去运输物品:帮我送东西、去送一下、去送物品、帮我拿过去、送这个过去、去送快递、运送一下、帮我带东西、把东西送过去、物品配送、拿去给XX、去取个东西、取完送过去、帮我捎个东西、把这个拿去、请运过去、送过去给谁、请配送一下、帮我转交、帮我送到、从XX拿到YY、物品搬运",
inputSchema={
"type": "object",
"properties": {
"first_location": {
"type": "string",
"description": "去这个位置取物品",
"minLength": 1
},
"next_location": {
"type": "string",
"description": "将物品运输、送到这个位置",
"minLength": 1
},
"user_id": {
"type": "string",
"description": "用户ID"
}
},
"required": ["first_location","next_location", "user_id"]
}
),
Tool(
name="patrol",
description="轮足机器人、助手、机器人去巡逻:巡逻、巡查、去检查一下、去看看、去巡视",
inputSchema={
"type": "object",
"properties": {
"locations": {
"type": "array",
"description": "机器人巡逻经过的地点列表",
"items": {
"type": "string"
}
},
"user_id": {
"type": "string",
"description": "用户ID"
}
},
"required": ["locations", "user_id"]
}
),
# Tool(
# name="guide",
# description="轮足机器人进行导览服务:导览、介绍、参观、带领参观、导游、讲解、展示、演示、介绍场所、带我看看",
# inputSchema={
# "type": "object",
# "properties": {
# "user_id": {
# "type": "string",
# "description": "用户ID"
# }
# },
# "required": ["user_id"]
# }
# )
# Tool(
# name="face",
# description="轮足机器人进行人脸识别:识别人脸、人脸识别、脸部识别、面部识别、认脸、看看是谁、识别一下、看看这个人、辨识人脸、扫脸、人脸检测、看脸识别、脸部检测、识别面孔、看人脸、人脸扫描、脸识别、面孔识别、认人、识别身份、看看这是谁、人脸认证、脸部扫描、面部扫描",
# inputSchema={
# "type": "object",
# "properties": {
# "device_id": {
# "type": "string",
# "description": "设备ID",
# "minLength": 1
# },
# "user_id": {
# "type": "string",
# "description": "用户ID"
# }
# },
# "required": ["device_id", "user_id"]
# }
# ),
# Tool(
# name="qrcode",
# description="轮足机器人进行二维码扫描:扫码、扫二维码、扫QR码、二维码识别、码扫描、扫一下码、读取二维码、识别二维码、二维码扫描、QR扫描、扫描条码、扫描编码、读码、解析二维码、扫描识别、码识别、条码扫描、扫描二维码、读取码、扫描条形码、二维码读取、码解析",
# inputSchema={
# "type": "object",
# "properties": {
# "device_id": {
# "type": "string",
# "description": "设备ID",
# "minLength": 1
# },
# "task_id": {
# "type": "string",
# "description": "任务ID"
# },
# "user_id": {
# "type": "string",
# "description": "用户ID"
# }
# },
# "required": ["device_id", "user_id"]
# }
# )
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
"""处理工具调用"""
try:
result = ""
if name == "nav_to":
if "location" not in arguments or "user_id" not in arguments:
raise ValueError("缺少必要参数: location or user_id")
result = await nav_server.nav_to(
user_id=arguments["user_id"],
location=arguments["location"],
flag=arguments.get("flag", False)
)
elif name == "speak":
if "speech" not in arguments or "user_id" not in arguments:
raise ValueError("缺少必要参数: speech or user_id")
result = await nav_server.speak(
user_id=arguments["user_id"],
speech=arguments["speech"]
)
elif name == "reception":
if "location" not in arguments or "user_id" not in arguments:
raise ValueError("缺少必要参数: location or user_id")
result = await nav_server.reception(
user_id=arguments["user_id"],
location=arguments["location"]
)
# elif name == "notification":
# if "location" not in arguments or "text" not in arguments or "user_id" not in arguments:
# raise ValueError("缺少必要参数: location、text or user_id")
# result = await nav_server.notification(
# user_id=arguments["user_id"],
# location=arguments["location"],
# text=arguments["text"]
# )
elif name == "repose":
if "user_id" not in arguments:
raise ValueError("缺少必要参数: user_id")
result = await nav_server.repose(user_id=arguments["user_id"])
elif name == "delivery":
if "first_location" not in arguments or "next_location" not in arguments or "user_id" not in arguments:
raise ValueError("缺少必要参数: first_location、next_location or user_id")
result = await nav_server.delivery(
user_id=arguments["user_id"],
first_location=arguments["first_location"],
next_location=arguments["next_location"]
)
elif name == "patrol":
if "locations" not in arguments or "user_id" not in arguments:
raise ValueError("缺少必要参数: locations or user_id")
result = await nav_server.patrol(
user_id=arguments["user_id"],
locations=arguments["locations"]
)
# elif name == "face":
# if "device_id" not in arguments or "user_id" not in arguments:
# raise ValueError("缺少必要参数: device_id or user_id")
# result = await nav_server.startFaceRecognize(
# device_id=arguments["device_id"],
# user_id=arguments["user_id"]
# )
# elif name == "qrcode":
# if "device_id" not in arguments or "user_id" not in arguments:
# raise ValueError("缺少必要参数: device_id or user_id")
# result = await nav_server.scanQRCode(
# device_id=arguments["device_id"],
# user_id=arguments["user_id"]
# )
else:
raise ValueError(f"未知工具: {name}")
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"工具调用失败: {str(e)}")
raise ValueError(f"执行失败: {str(e)}")
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
def main():
asyncio.run(serve())
if __name__ == "__main__":
main()