Files
lzwcai-mcp-server-package/lzwcai_mcp_iot/lzwcai_mcp_iot/iot_device_tool.py
yuanzhipeng ec7e7fd7dc feat(lzwcai-demp-tool-server-dify-to-mcp): 初始化 Dify 集成工具模块
新增 Dify 到 MCP 的集成工具,支持通过 Dify API 将模型部署到 MCP 平台并进行推理。
该模块包含完整的服务器实现、依赖配置和命令行启动脚本。

主要功能:
- 支持 Workflow 和 Completion 模式的调用
- 自动翻译工具名称为驼峰命名格式
- 提供文件上传与任务停止接口
- 兼容流式与非流式响应处理
2025-12-16 17:52:04 +08:00

411 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
IoT设备服务器主模块
该模块实现了一个简单的IoT设备控制服务器使用FastMCP框架提供设备操作功能。
"""
import os
import json
from mcp.server.fastmcp import FastMCP
from .src.device_results_pretreatment import (
format_devices_list,
)
from .src.device_operations import DeviceOperator
from .src.vector_service import VectorService
from .src.logger_config import get_logger
from .config import CONFIG
from .src.iot_device_dicts_prompt import (
iot_get_devices_by_location_prompt,
iot_get_all_spaces_and_devices_prompt,
iot_device_precise_controller_prompt,
smart_space_device_locator_matcher_prompt,
)
from .src.init_mcp import init_mcp_server
# 延迟初始化日志器,避免在导入时立即执行
logger = None
def _ensure_logger():
"""确保日志器已初始化"""
global logger
if logger is None:
logger = get_logger(__name__)
return logger
# 创建FastMCP实例
mcp = FastMCP("iot_device_server")
# 创建设备操作实例
device_op = DeviceOperator(api_base_url=CONFIG["device_api_base_url"])
# 创建向量服务实例
vector_service = VectorService(base_url=CONFIG["vector_api_base_url"])
@mcp.tool(description=iot_get_devices_by_location_prompt())
async def iot_get_devices_by_location(
location: str,
) -> str:
logger = _ensure_logger()
try:
# 输入参数验证
if not location:
error_msg = "location参数是必需的"
logger.error(error_msg)
return json.dumps(
{"code": 400, "msg": error_msg, "data": None}, ensure_ascii=False
)
logger.info(
f"开始处理IoT设备查询请求 - 位置: {location}"
)
# 从环境变量获取企业ID已在启动时初始化
enterprise_id = os.environ.get("enterpriseId")
if not enterprise_id:
error_msg = f"企业ID未初始化请检查员工ID配置"
logger.error(error_msg)
return json.dumps(
{"code": 404, "msg": error_msg, "data": None}, ensure_ascii=False
)
logger.info(f"获取到企业ID: {enterprise_id}")
# 查询设备列表
query_result = vector_service.query_devices_by_location(
keyId=enterprise_id,
location=location
)
logger.info(f"查询设备列表结果: {query_result}")
# 提取设备列表
devices = query_result.get("devices", [])
device_count = query_result.get("count", 0)
if device_count == 0 or not devices:
return json.dumps({
"code": 404,
"msg": f"在位置「{location}」未找到任何设备",
"data": None
}, ensure_ascii=False)
# 使用格式化函数将设备列表转换为易读的文本
formatted_text = format_devices_list(devices)
logger.info(f"设备列表已格式化,设备数量: {device_count}")
# 返回包含格式化文本和原始数据的结果
result = {
"code": 200,
"msg": formatted_text, # 直接将格式化文本放在msg字段
"data": {
"devices": devices, # 原始设备数据,供后续操作使用
"count": device_count,
"location_filter": query_result.get("location_filter", location)
}
}
return json.dumps(result, ensure_ascii=False)
except Exception as e:
error_msg = f"IoT设备查询过程中发生异常: {str(e)}"
logger.error(error_msg, exc_info=True)
return json.dumps(
{"code": 500, "msg": error_msg, "data": None}, ensure_ascii=False
)
@mcp.tool(description=iot_get_all_spaces_and_devices_prompt())
async def iot_get_all_spaces_and_devices() -> str:
logger = _ensure_logger()
try:
logger.info("开始处理获取所有空间位置信息请求")
# 从环境变量获取企业ID已在启动时初始化
enterprise_id = os.environ.get("enterpriseId")
if not enterprise_id:
error_msg = f"企业ID未初始化请检查员工ID配置"
logger.error(error_msg)
return json.dumps(
{"code": 404, "msg": error_msg, "data": None}, ensure_ascii=False
)
logger.info(f"获取到企业ID: {enterprise_id}")
# 查询所有设备(不指定位置,获取所有)
query_result = vector_service.query_devices_by_location(
keyId=enterprise_id,
location="" # 空字符串表示获取所有
)
logger.info(f"查询所有设备结果: {query_result}")
# 提取设备列表
devices = query_result.get("devices", [])
device_count = query_result.get("count", 0)
if device_count == 0 or not devices:
return json.dumps({
"code": 404,
"msg": "系统中未找到任何设备和空间",
"data": None
}, ensure_ascii=False)
# 提取所有唯一的空间名称
spaces_set = set()
for device in devices:
# 提取空间信息
location_desc = device.get("location_desc", "")
if location_desc and location_desc != "未知空间":
spaces_set.add(location_desc)
# 转换为列表并排序
spaces_list = sorted(list(spaces_set))
if not spaces_list:
return json.dumps({
"code": 404,
"msg": "系统中未找到任何有效空间",
"data": None
}, ensure_ascii=False)
# 构建格式化文本
formatted_lines = []
formatted_lines.append("=" * 60)
formatted_lines.append(f"所有空间位置信息")
formatted_lines.append("=" * 60)
formatted_lines.append(f"空间总数: {len(spaces_list)}")
formatted_lines.append("")
# 列出所有空间
for space_idx, space_name in enumerate(spaces_list, 1):
formatted_lines.append(f"{space_idx}. {space_name}")
formatted_lines.append("")
formatted_lines.append("=" * 60)
formatted_text = "\n".join(formatted_lines)
logger.info(f"已获取所有空间位置信息,空间数: {len(spaces_list)}")
# 返回包含格式化文本和空间列表的结果
result = {
"code": 200,
"msg": formatted_text,
"data": {
"spaces": spaces_list, # 空间名称列表
"space_count": len(spaces_list)
}
}
return json.dumps(result, ensure_ascii=False)
except Exception as e:
error_msg = f"获取所有空间位置信息过程中发生异常: {str(e)}"
logger.error(error_msg, exc_info=True)
return json.dumps(
{"code": 500, "msg": error_msg, "data": None}, ensure_ascii=False
)
@mcp.tool(description=iot_device_precise_controller_prompt())
async def iot_device_precise_controller(
entityId: str,
command: str,
params: dict = None,
userId: str = None,
) -> str:
logger = _ensure_logger()
logger.info("=" * 60)
logger.info("调用iot_device_precise_controller工具")
logger.info(f"参数 - entityId: {entityId}, command: {command}, params: {params}, userId: {userId}")
logger.info("=" * 60)
try:
# 输入参数验证
logger.debug("开始参数验证...")
if not all([entityId, command]):
error_msg = "所有参数都是必需的: entityId,command"
logger.error(f"参数验证失败: {error_msg}")
return json.dumps(
{"code": 400, "msg": error_msg, "data": None}, ensure_ascii=False
)
logger.debug("参数验证通过")
# 从环境变量获取企业ID已在启动时初始化
enterprise_id = os.environ.get("enterpriseId")
if not enterprise_id:
error_msg = f"企业ID未初始化请检查员工ID配置"
logger.error(error_msg)
return json.dumps(
{"code": 404, "msg": error_msg, "data": None}, ensure_ascii=False
)
logger.info(f"获取到企业ID: {enterprise_id}")
map_result = {
"enterpriseId": enterprise_id,
"entityId": entityId,
"command": command,
"params": params if params is not None else {},
}
# 如果提供了userId则添加到map_result中
if userId:
map_result["userId"] = userId
# 操作设备
result = device_op.operate_device(map_result)
logger.info(f"设备操作结果: {result}")
# 将结果字典转换为JSON字符串
if isinstance(result, dict):
result = json.dumps(result, ensure_ascii=False)
logger.info("iot_device_precise_controller工具执行完成")
logger.info("=" * 60)
return result
except Exception as e:
error_msg = f"IoT设备操作过程中发生异常: {str(e)}"
logger.error(error_msg, exc_info=True)
logger.error("iot_device_precise_controller工具执行失败")
logger.error("=" * 60)
return json.dumps(
{"code": 500, "msg": error_msg, "data": None}, ensure_ascii=False
)
@mcp.tool(description=smart_space_device_locator_matcher_prompt())
async def smart_space_device_locator_matcher(
userId: str,
) -> str:
logger = _ensure_logger()
try:
if not userId:
error_msg = "所有参数都是必需的: userId"
logger.error(error_msg)
return json.dumps(
{"code": 400, "msg": error_msg, "data": None}, ensure_ascii=False
)
result = device_op.get_user_spaces_by_user_id(userId)
if isinstance(result, dict):
# 检查返回的data是否为None如果是则提供友好的描述
if result.get("data") is None:
result["msg"] = "我目前还没发现您在哪里 请您告诉我你所属位置;"
return json.dumps(result, ensure_ascii=False)
return result
except Exception as e:
error_msg = f"定位空间过程中发生异常: {str(e)}"
logger.error(error_msg, exc_info=True)
return json.dumps(
{"code": 500, "msg": error_msg, "data": None}, ensure_ascii=False
)
def testFn() -> str:
logger = _ensure_logger()
try:
# 读取test.json文件
test_file_path = r"E:\yh-ai\project\lzwcai-szyg\lzwcai-mcp-server-package\lzwcai_mcp_iot\test.json"
with open(test_file_path, "r", encoding="utf-8") as f:
test_data = json.load(f)
# 获取results数据
results = test_data.get("results", [])
logger.info(f"从test.json读取到的results数据: {results}")
# 参数预处理
map_result = device_op.preprocess_results(results)
logger.info(f"参数预处理结果: {map_result}")
# 保存map_result到本地JSON文件
output_file_path = r"E:\yh-ai\project\lzwcai-szyg\lzwcai-mcp-server-package\lzwcai_mcp_iot\map_result.json"
with open(output_file_path, "w", encoding="utf-8") as f:
json.dump(map_result, f, ensure_ascii=False, indent=2)
logger.info(f"map_result已保存到: {output_file_path}")
return json.dumps(map_result, ensure_ascii=False)
except FileNotFoundError:
error_msg = "test.json文件未找到"
logger.error(error_msg)
return json.dumps(
{"code": 404, "msg": error_msg, "data": None}, ensure_ascii=False
)
except json.JSONDecodeError as e:
error_msg = f"test.json文件格式错误: {str(e)}"
logger.error(error_msg)
return json.dumps(
{"code": 400, "msg": error_msg, "data": None}, ensure_ascii=False
)
except Exception as e:
error_msg = f"测试过程中发生异常: {str(e)}"
logger.error(error_msg, exc_info=True)
return json.dumps(
{"code": 500, "msg": error_msg, "data": None}, ensure_ascii=False
)
def main():
"""主函数启动MCP服务器"""
logger = _ensure_logger()
try:
logger.info("=" * 80)
logger.info("IoT设备MCP服务器启动流程开始")
logger.info(f"配置信息: {CONFIG}")
logger.info("=" * 80)
# 初始化组件
logger.info("正在初始化核心组件...")
logger.info(f"设备操作器API地址: {CONFIG['device_api_base_url']}")
logger.info(f"向量服务API地址: {CONFIG['vector_api_base_url']}")
logger.info("核心组件初始化完成")
# 获取企业ID并初始化MCP服务器
enterprise_id = CONFIG.get("enterprise_id")
if enterprise_id:
logger.info(f"检测到企业ID配置: {enterprise_id}")
logger.info("开始初始化MCP服务器...")
init_result = init_mcp_server(device_op, vector_service, enterprise_id)
logger.info(f"MCP服务器初始化结果: {init_result}")
if init_result.get("code") != 200:
logger.warning(f"MCP服务器初始化失败但服务器仍将启动: {init_result}")
else:
logger.info("MCP服务器初始化成功")
# 保存企业ID到环境变量
returned_enterprise_id = init_result.get("data", {}).get("enterprise_id")
if returned_enterprise_id:
device_op.set_enterprise_id_to_env(returned_enterprise_id)
logger.info(f"企业ID已保存到环境变量: {returned_enterprise_id}")
else:
# 如果返回结果中没有enterprise_id使用配置中的
device_op.set_enterprise_id_to_env(enterprise_id)
logger.info(f"企业ID已保存到环境变量: {enterprise_id}")
else:
logger.warning("未配置企业ID跳过预初始化")
logger.info("提示您可以在配置文件或环境变量中设置ENTERPRISE_ID来启用自动初始化功能")
# 注册的工具列表日志
logger.info("已注册的MCP工具:")
logger.info("1. iot_get_devices_by_location - 根据位置获取设备列表")
logger.info("2. iot_get_all_spaces_and_devices - 获取所有空间位置信息")
logger.info("3. iot_device_precise_controller - IoT设备精确控制")
logger.info("4. smart_space_device_locator_matcher - 智能空间设备定位")
logger.info("=" * 80)
logger.info("MCP服务器即将启动等待客户端连接...")
logger.info("传输方式: stdio (标准输入输出)")
logger.info("注意: 控制台日志已禁用,所有日志将写入文件")
logger.info("=" * 80)
# 使用标准输入输出作为传输方式运行服务器
mcp.run(transport="stdio")
except Exception as e:
logger.error("=" * 80)
logger.error(f"服务器启动失败: {str(e)}")
logger.error("错误详情:", exc_info=True)
logger.error("=" * 80)
raise
if __name__ == "__main__":
main()