Add initial implementation of Terminal DHR MCP Skill Package including environment configuration, server setup, visitor management, and door control functionalities. Added README, requirements, and configuration files for project setup.

This commit is contained in:
2025-09-26 11:39:12 +08:00
parent 0b63355f16
commit fa6a044d75
11 changed files with 1647 additions and 0 deletions

17
terminal_dhr_mcp/.env Normal file
View File

@@ -0,0 +1,17 @@
# Terminal DHR MCP Skill Package Environment Variables Configuration Example
# Copy this file to .env and modify the configuration values
# Visitor system API configuration
BASE_URL=http://192.168.2.236:8088
VISITOR_API_TIMEOUT=30
# Log level (DEBUG, INFO, WARNING, ERROR)
LOG_LEVEL=INFO
# Optional: Custom API endpoints (if different from default)
# VISITOR_QUERY_ENDPOINT=/system/visitorRecord/mcp/validRecordsByUserId
# VISITOR_REGISTER_ENDPOINT=/visitors
# Optional: Custom request headers
# API_KEY=your-api-key-here
# AUTH_TOKEN=your-auth-token-here

520
terminal_dhr_mcp/README.md Normal file
View File

@@ -0,0 +1,520 @@
# Terminal DHR MCP Server
[![Python 3.8+](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![MCP Protocol](https://img.shields.io/badge/MCP-Protocol-green.svg)](https://modelcontextprotocol.io/)
[![Version](https://img.shields.io/badge/version-0.1.7-blue.svg)](https://pypi.org/project/terminal-dhr-mcp/)
一个基于 [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) 的智能终端服务器提供访客管理和门控制功能。支持通过AI助手进行访客信息查询、注册和门禁控制操作。
## ✨ 功能特性
### 🏢 访客管理系统
- **访客记录查询**: 根据用户ID查询访客预约记录支持历史数据检索
- **访客信息注册**: 在访客系统中注册新的访客信息,包含完整的验证机制
- **数据验证**: 完整的输入参数验证和错误处理,确保数据完整性
- **API集成**: 与现有访客管理系统无缝集成
### 🚪 智能门控制
- **远程门控制**: 通过Home Assistant API控制门的开关支持多种门类型
- **状态监控**: 实时获取门的当前状态信息,包括开关状态和错误信息
- **灵活配置**: 支持自定义实体ID和API端点适配不同环境
- **安全控制**: 内置安全验证机制,防止误操作
### 🔧 技术特性
- **MCP协议**: 完全兼容Model Context Protocol标准支持最新版本
- **异步处理**: 基于asyncio的高性能异步架构支持并发操作
- **配置管理**: 支持环境变量和配置文件,灵活的环境适配
- **日志系统**: 完整的操作日志和错误追踪,便于调试和监控
- **错误处理**: 完善的异常处理和重试机制,提高系统稳定性
- **类型安全**: 基于Pydantic的强类型验证确保数据安全
## 🚀 快速开始
### 环境要求
- Python 3.8 - 3.12
- Home Assistant服务器用于门控制功能
- 访客管理系统API可选
### 安装方式
#### 方式一使用pip安装
```bash
pip install terminal-dhr-mcp
```
#### 方式二:从源码安装
```bash
git clone <repository-url>
cd terminal_dhr_mcp
pip install -e .
```
#### 方式三使用uv安装推荐
```bash
# 使用uvx直接运行
uvx terminal-dhr-mcp
# 或安装到环境
uv add terminal-dhr-mcp
```
### 基础配置
1. **创建配置文件**
```bash
# 创建环境变量文件
cp .env.example .env
```
2. **配置环境变量**
```env
# 访客系统配置
BASE_URL=http://192.168.2.236:8088
VISITOR_API_TIMEOUT=30
# Home Assistant配置
HA_URL=http://192.168.1.100:8123
HA_TOKEN=your_long_lived_access_token
DOOR_ENTITY_ID=switch.door_switch
# 日志配置
LOG_LEVEL=INFO
```
3. **配置MCP客户端**
创建 `mcp-server-dhr.json` 配置文件:
```json
{
"mcpServers": {
"terminal-dhr-mcp": {
"disabled": false,
"type": "stdio",
"timeout": 30,
"command": "uvx",
"args": ["terminal-dhr-mcp"]
}
}
}
```
## 📖 详细文档
### 工具API参考
#### 访客管理工具
##### `query_visitor` - 查询访客记录
根据用户ID查询访客预约记录。
**参数:**
```json
{
"user_id": "string" // 必需要查询的用户ID
}
```
**示例:**
```json
{
"user_id": "12345"
}
```
**响应:**
```json
{
"content": [
{
"type": "text",
"text": "查询结果找到2条访客记录..."
}
]
}
```
**错误处理:**
- 用户ID不存在返回"未找到该用户的访客记录"
- 网络错误:返回"网络连接失败,请稍后重试"
- 服务器错误:返回"服务器错误,请稍后重试"
##### `register_visitor` - 注册访客信息
在访客系统中注册新的访客信息。
**参数:**
```json
{
"employeeId": "string", // 必需员工ID最大50字符
"visitorName": "string", // 必需访客姓名最大50字符
"reason": "string", // 必需访问原因最大200字符
"userId": "string" // 必需用户ID最大50字符
}
```
**示例:**
```json
{
"employeeId": "EMP001",
"visitorName": "张三",
"reason": "业务洽谈",
"userId": "12345"
}
```
**验证规则:**
- 所有字段不能为空
- 字段长度限制employeeId(50)、visitorName(50)、reason(200)、userId(50)
- 特殊字符过滤和转义
#### 门控制工具
##### `control_door` - 控制门开关
通过Home Assistant API控制门的开关。
**参数:**
```json
{
"action": "string", // 必需:门操作,"open" 或 "close"
"entity_id": "string" // 可选Home Assistant实体ID默认使用配置中的DOOR_ENTITY_ID
}
```
**示例:**
```json
{
"action": "open"
}
```
**响应:**
```json
{
"content": [
{
"type": "text",
"text": "门已成功打开"
}
]
}
```
**错误处理:**
- 无效操作:返回"无效的门操作,请使用 'open' 或 'close'"
- 连接失败:返回"无法连接到Home Assistant"
- 权限错误:返回"权限不足,请检查访问令牌"
##### `get_door_status` - 获取门状态
获取门的当前状态信息。
**参数:**
```json
{
"entity_id": "string" // 可选Home Assistant实体ID默认使用配置中的DOOR_ENTITY_ID
}
```
**示例:**
```json
{}
```
**响应:**
```json
{
"content": [
{
"type": "text",
"text": "门当前状态:关闭"
}
]
}
```
### 配置详解
#### Home Assistant配置
1. **获取长期访问令牌**
- 登录Home Assistant
- 进入"配置" > "用户" > "长期访问令牌"
- 创建新令牌并复制到环境变量HA_TOKEN
2. **配置门开关实体**
- 确保门开关在Home Assistant中正确配置
- 记录实体ID`switch.door_switch`
- 在环境变量中设置DOOR_ENTITY_ID
3. **网络访问**
- 确保MCP服务器能够访问Home Assistant
- 检查防火墙和网络配置
- 验证HA_URL配置正确
#### 访客系统配置
1. **API端点配置**
- 配置访客系统的API基础地址BASE_URL
- 确保API端点可访问
- 验证API响应格式
2. **超时设置**
- 根据网络情况调整API超时时间VISITOR_API_TIMEOUT
- 建议设置为30秒或更长
- 考虑网络延迟和服务器响应时间
## 🔧 故障排除
### 常见问题
#### 1. 模块导入错误
**错误信息:**
```
ModuleNotFoundError: No module named 'tools'
```
**解决方案:**
- 确保使用相对导入(已在最新版本中修复)
- 重新安装包:`pip install --force-reinstall terminal-dhr-mcp`
- 检查Python环境`python --version`
- 验证安装:`pip list | grep terminal-dhr-mcp`
#### 2. Home Assistant连接失败
**错误信息:**
```
HTTP 401 Unauthorized
```
**解决方案:**
- 检查HA_TOKEN是否正确设置
- 确认令牌未过期
- 验证Home Assistant服务器可访问
- 检查HA_URL配置格式
#### 3. 访客API连接失败
**错误信息:**
```
Connection timeout
```
**解决方案:**
- 检查BASE_URL是否正确
- 确认网络连接正常
- 调整VISITOR_API_TIMEOUT值
- 验证访客系统服务状态
#### 4. MCP服务器启动失败
**错误信息:**
```
Connection closed
```
**解决方案:**
- 检查Python版本需要3.8+
- 确认所有依赖已正确安装
- 验证配置文件格式
- 检查权限设置
#### 5. 门控制操作失败
**错误信息:**
```
Entity not found
```
**解决方案:**
- 检查DOOR_ENTITY_ID配置
- 确认实体在Home Assistant中存在
- 验证实体类型是否为switch或cover
- 检查实体权限设置
### 日志调试
启用详细日志:
```env
LOG_LEVEL=DEBUG
```
查看日志输出以获取详细的错误信息:
```bash
# 查看实时日志
tail -f logs/terminal_dhr_mcp.log
# 查看错误日志
grep ERROR logs/terminal_dhr_mcp.log
```
### 性能优化
1. **连接池配置**
- 调整HTTP连接池大小
- 优化超时设置
2. **缓存策略**
- 实现状态缓存机制
- 减少重复API调用
3. **并发控制**
- 限制并发请求数量
- 实现请求队列管理
## 🛠️ 开发指南
### 项目结构
```
terminal_dhr_mcp/
├── terminal_dhr_mcp/
│ ├── __init__.py # 包初始化
│ ├── main.py # 主程序入口
│ ├── tools.py # 工具定义和处理
│ ├── config.py # 配置管理
│ ├── visitor_service.py # 访客服务
│ └── door_control.py # 门控制服务
├── pyproject.toml # 项目配置
├── requirements.txt # 依赖列表
├── mcp-server-dhr.json # MCP服务器配置
└── README.md # 项目文档
```
### 开发环境设置
1. **克隆项目**
```bash
git clone <repository-url>
cd terminal_dhr_mcp
```
2. **创建虚拟环境**
```bash
python -m venv venv
source venv/bin/activate # Linux/Mac
# 或
venv\Scripts\activate # Windows
```
3. **安装开发依赖**
```bash
pip install -e ".[dev,test]"
```
### 添加新工具
1. **定义工具**
`tools.py` 中添加新工具定义:
```python
Tool(
name="new_tool",
description="新工具描述",
inputSchema={
"type": "object",
"properties": {
"param": {"type": "string"}
},
"required": ["param"]
}
)
```
2. **实现处理函数**
添加对应的处理函数:
```python
async def handle_new_tool(arguments: Dict[str, Any]) -> CallToolResult:
# 实现工具逻辑
return CallToolResult(content=[TextContent(text="结果")])
```
3. **注册工具**
在工具列表中添加新工具。
### 测试
运行测试套件:
```bash
# 运行所有测试
pytest
# 运行特定测试
pytest tests/test_visitor_service.py
# 运行代码检查
black .
isort .
flake8 .
mypy .
# 运行覆盖率测试
pytest --cov=terminal_dhr_mcp
```
### 代码质量
项目使用以下工具确保代码质量:
- **Black**: 代码格式化
- **isort**: 导入排序
- **flake8**: 代码检查
- **mypy**: 类型检查
- **pytest**: 单元测试
## 📄 许可证
本项目采用 [MIT License](LICENSE) 许可证。
## 🤝 贡献
欢迎提交Issue和Pull Request
1. Fork 项目
2. 创建功能分支 (`git checkout -b feature/AmazingFeature`)
3. 提交更改 (`git commit -m 'Add some AmazingFeature'`)
4. 推送到分支 (`git push origin feature/AmazingFeature`)
5. 打开 Pull Request
### 贡献指南
- 遵循现有代码风格
- 添加适当的测试
- 更新相关文档
- 确保所有测试通过
## 📞 支持
- **作者**: Sucan126
- **邮箱**: 632190820@qq.com
## 🔄 更新日志
### v0.1.7
- 修复模块导入问题
- 优化错误处理机制
- 完善API文档
- 增强配置管理
### v0.1.6
- 添加门控制功能
- 实现访客查询API
- 支持环境变量配置
---
**注意**: 使用本软件前请确保遵守相关法律法规和隐私政策。

View File

@@ -0,0 +1,16 @@
{
"mcpServers": {
"terminal-dhr-mcp": {
"disabled": false,
"type": "stdio",
"timeout": 30,
"command": "uvx",
"args": [
"terminal-dhr-mcp"
],
"env": {
"employeeId": "$employeeId$"
}
}
}
}

View File

@@ -0,0 +1,176 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "terminal-dhr-mcp"
version = "0.1.7"
description = "Terminal DHR MCP Skill Package - Visitor Information Query and Registration System"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "Sucan126", email = "632190820@qq.com"}
]
maintainers = [
{name = "Sucan126", email = "632190820@qq.com"}
]
keywords = ["mcp", "terminal", "dhr", "visitor", "api", "protocol"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Communications",
"Topic :: Internet :: WWW/HTTP :: HTTP Servers",
]
requires-python = ">=3.8"
dependencies = [
"mcp>=1.0.0",
"httpx>=0.24.0",
"pydantic>=2.0.0",
"python-dotenv>=1.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"black>=23.0.0",
"isort>=5.12.0",
"flake8>=6.0.0",
"mypy>=1.0.0",
]
test = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.0.0",
]
[project.scripts]
terminal-dhr-mcp = "terminal_dhr_mcp.main:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["*"]
exclude = ["tests*", "test_*", "*.tests", "*.test"]
[tool.setuptools.package-data]
"*" = ["*.md", "*.txt", "*.toml"]
[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
/(
# directories
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| build
| dist
)/
'''
[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 88
known_first_party = ["terminal_dhr_mcp"]
known_third_party = ["mcp", "httpx", "pydantic"]
[tool.mypy]
python_version = "3.8"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
strict_equality = true
[[tool.mypy.overrides]]
module = [
"mcp.*",
"httpx.*",
]
ignore_missing_imports = true
[tool.pytest.ini_options]
minversion = "7.0"
addopts = "-ra -q --strict-markers --strict-config"
testpaths = ["tests", "test_*"]
asyncio_mode = "auto"
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"integration: marks tests as integration tests",
"unit: marks tests as unit tests",
]
[tool.coverage.run]
source = ["."]
omit = [
"*/tests/*",
"*/test_*",
"*/venv/*",
"*/env/*",
"*/__pycache__/*",
"*/migrations/*",
"*/migrations/*",
"setup.py",
"manage.py",
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"if self.debug:",
"if settings.DEBUG",
"raise AssertionError",
"raise NotImplementedError",
"if 0:",
"if __name__ == .__main__.:",
"class .*\\bProtocol\\):",
"@(abc\\.)?abstractmethod",
]
[tool.ruff]
target-version = "py38"
line-length = 88
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
]
ignore = [
"E501", # line too long, handled by black
"B008", # do not perform function calls in argument defaults
"C901", # too complex
]
[tool.ruff.per-file-ignores]
"__init__.py" = ["F401"]
"tests/*" = ["B011"]
[tool.ruff.isort]
known-first-party = ["terminal_dhr_mcp"]

View File

@@ -0,0 +1,4 @@
mcp>=1.0.0
python-dotenv>=1.0.0
httpx>=0.24.0
pydantic>=2.0.0

View File

@@ -0,0 +1,13 @@
"""
Terminal DHR MCP Package
"""
__version__ = "0.0.1"
__author__ = "Sucan126"
__email__ = "632190820@qq.com"
from .main import main
if __name__ == "__main__":
import asyncio
asyncio.run(main())

View File

@@ -0,0 +1,39 @@
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
"""Terminal DHR MCP Service Configuration"""
# Employee configuration
EMPLOYEE_ID = os.getenv("employeeId", "")
BASE_URL = os.getenv("BASE_URL", "http://192.168.2.236:8088")
# Visitor system API configuration
VISITOR_API_TIMEOUT = int(os.getenv("VISITOR_API_TIMEOUT", "30"))
# API endpoints
VISITOR_QUERY_ENDPOINT = "/system/visitorRecord/mcp/validRecordsByUserId"
VISITOR_REGISTER_ENDPOINT = "/system/visitorRecord/mcp/addFromVisitorByUserId"
# New door control API configuration
DOOR_API_ENDPOINT = "/space_iot/operation/call"
DOOR_API_TIMEOUT = int(os.getenv("DOOR_API_TIMEOUT", "10"))
DOOR_ENTERPRISE_ID = os.getenv("DOOR_ENTERPRISE_ID", "")
DOOR_DEFAULT_ENTITY_ID = os.getenv("DOOR_DEFAULT_ENTITY_ID", "switch.giot_cn_1110921716_v51ksm_on_p_2_1")
# Request headers configuration
DEFAULT_HEADERS = {
"Accept": "application/json"
}
# Logging configuration
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
# Data validation configuration
MAX_NAME_LENGTH = 50
MAX_REASON_LENGTH = 200
MAX_EMPLOYEE_ID_LENGTH = 50
MAX_USER_ID_LENGTH = 50

View File

@@ -0,0 +1,215 @@
import httpx
import logging
from typing import Dict, Any, Optional
from .config import Config
logger = logging.getLogger(__name__)
class DoorControlService:
"""Door control service using new API interface"""
def __init__(self):
self.base_url = Config.BASE_URL
self.endpoint = Config.DOOR_API_ENDPOINT
self.enterprise_id = Config.DOOR_ENTERPRISE_ID
self.default_entity_id = Config.DOOR_DEFAULT_ENTITY_ID
self.timeout = Config.DOOR_API_TIMEOUT
async def control_door(
self,
action: str,
entity_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Control door via new API interface
Args:
action: 'open' or 'close'
entity_id: Entity ID for the door device (optional, uses default if not provided)
Returns:
Dict with success status and message
"""
try:
# Validate action
if action.lower() not in ['open', 'close']:
return {
"success": False,
"message": f"Invalid action '{action}'. Must be 'open' or 'close'"
}
# Use provided parameters or defaults
target_entity_id = entity_id or self.default_entity_id
# Map action to API command
command_mapping = {
'open': 'turn_on',
'close': 'turn_off'
}
command = command_mapping.get(action.lower())
# Prepare request data according to new API format
json_body = {
"enterpriseId": self.enterprise_id,
"entityId": target_entity_id,
"command": command
}
# Prepare headers
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "Apifox/1.0.0 (https://apifox.com)",
"Connection": "keep-alive"
}
# Construct full URL
url = f"{self.base_url}{self.endpoint}"
logger.info(f"Attempting to {action} - Entity: {target_entity_id}, Command: {command}, URL: {url}")
# Make API request using httpx
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(url, json=json_body, headers=headers)
if response.status_code == 200:
result_data = response.json()
logger.info(f"Door control successful: {action}")
return {
"success": True,
"message": f"Door control successful: {action}",
"action": action,
"entity_id": target_entity_id,
"command": command,
"response": result_data
}
else:
error_text = response.text
logger.error(f"Door control failed: HTTP {response.status_code} - {error_text}")
return {
"success": False,
"message": f"Door control failed: HTTP {response.status_code} - {error_text}",
"action": action,
"entity_id": target_entity_id,
"command": command
}
except httpx.TimeoutException:
logger.error("Timeout during door control request")
return {
"success": False,
"message": "Request timeout, please check network connection",
"action": action
}
except httpx.RequestError as e:
logger.error(f"Network error during door control: {e}")
return {
"success": False,
"message": f"Network connection error: {str(e)}",
"action": action
}
except Exception as e:
logger.error(f"Unexpected error during door control: {e}")
return {
"success": False,
"message": f"System error: {str(e)}",
"action": action
}
async def get_door_status(self, entity_id: Optional[str] = None) -> Dict[str, Any]:
"""
Get current door status from new API interface
Args:
entity_id: Entity ID for the door device (optional, uses default if not provided)
Returns:
Dict with door status information optimized for AI understanding
"""
try:
target_entity_id = entity_id or self.default_entity_id
# Prepare request data for status query
json_body = {
"enterpriseId": self.enterprise_id,
"entityId": target_entity_id,
"command": "get_status" # Assuming this command exists for status query
}
# Prepare headers
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "Apifox/1.0.0 (https://apifox.com)",
"Connection": "keep-alive"
}
# Construct full URL
url = f"{self.base_url}{self.endpoint}"
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(url, json=json_body, headers=headers)
if response.status_code == 200:
result_data = response.json()
# Extract status information from response
# Note: This is a placeholder implementation as the actual response format
# from the new API is not known. You may need to adjust this based on
# the actual API response structure.
# Default status mapping
status_mapping = {
"on": "open",
"off": "closed",
"unavailable": "unavailable",
"unknown": "unknown"
}
# Try to extract status from response
current_state = "unknown"
if isinstance(result_data, dict):
current_state = result_data.get("state", "unknown")
if current_state in status_mapping:
current_state = status_mapping[current_state]
# Build AI-friendly response
ai_response = {
"success": True,
"device_name": target_entity_id,
"current_state": current_state,
"raw_state": result_data.get("state", "unknown"),
"entity_id": target_entity_id,
"response": result_data
}
return ai_response
else:
error_text = response.text
return {
"success": False,
"message": f"Failed to get door status: HTTP {response.status_code} - {error_text}",
"entity_id": target_entity_id
}
except httpx.TimeoutException:
logger.error("Timeout getting door status")
return {
"success": False,
"message": "Timeout getting door status, please check network connection",
"entity_id": entity_id or self.default_entity_id
}
except httpx.RequestError as e:
logger.error(f"Network error getting door status: {e}")
return {
"success": False,
"message": f"Network error getting door status: {str(e)}",
"entity_id": entity_id or self.default_entity_id
}
except Exception as e:
logger.error(f"Error getting door status: {e}")
return {
"success": False,
"message": f"Error getting door status: {str(e)}",
"entity_id": entity_id or self.default_entity_id
}

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
"""
Terminal DHR MCP Skill Package
"""
import asyncio
import logging
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
)
from .tools import handle_list_tools, handle_call_tool, tools
from .config import Config
# Configure logging
logging.basicConfig(
level=getattr(logging, Config.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DhrMCPServer:
"""Terminal DHR MCP Server"""
def __init__(self):
self.server = Server("terminal-dhr-mcp")
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register MCP handlers"""
@self.server.list_tools()
async def list_tools():
logger.info("Received tool list request")
return tools # 直接返回工具列表
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
logger.info(f"Received tool call request: {name}")
result = await handle_call_tool(name, arguments)
return result.content # 直接返回内容列表
async def _main():
"""Main async function"""
logger.info("Starting terminal-dhr-mcp server...")
# Create server instance
server = DhrMCPServer()
# Start stdio server
async with stdio_server() as (read_stream, write_stream):
logger.info("MCP server started, waiting for connections...")
await server.server.run(
read_stream,
write_stream,
server.server.create_initialization_options()
)
def main():
"""Main entry point for the package"""
try:
asyncio.run(_main())
except KeyboardInterrupt:
logger.info("Server interrupted by user")
except Exception as e:
logger.error(f"Server runtime error: {e}")
raise
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,311 @@
from typing import Any, Dict, List
from mcp import StdioServerParameters
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
from .visitor_service import VisitorService, VisitorInfo
from .door_control import DoorControlService
from .config import Config
# Create service instances
visitor_service = VisitorService()
door_control_service = DoorControlService()
# Define tool list
tools = [
Tool(
name="query_visitor",
description="Query visitor reservation records in the visitor system by user ID",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User ID to query reservation records"
}
},
"required": ["user_id"]
}
),
Tool(
name="register_visitor",
description="Register new visitor information in the visitor system",
inputSchema={
"type": "object",
"properties": {
"visitor_name": {
"type": "string",
"description": "Visitor name",
"maxLength": 50
},
"reason": {
"type": "string",
"description": "Visit reason",
"maxLength": 200
},
"user_id": {
"type": "string",
"description": "User ID",
"maxLength": 50
}
},
"required": ["visitor_name", "reason", "user_id"]
}
),
Tool(
name="control_door",
description="Control door via IoT API - open or close the door",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"description": "Door action to perform",
"enum": ["open", "close"],
"default": "open"
},
"entity_id": {
"type": "string",
"description": "IoT entity ID for the door device (optional, uses default if not provided)"
}
},
"required": ["action"]
}
),
Tool(
name="get_door_status",
description="Get current door status from IoT API",
inputSchema={
"type": "object",
"properties": {
"entity_id": {
"type": "string",
"description": "IoT entity ID for the door device (optional, uses default if not provided)"
}
}
}
)
]
async def handle_list_tools(request: ListToolsRequest) -> ListToolsResult:
"""Handle tool list request"""
return ListToolsResult(tools=tools)
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Handle tool call request"""
if name == "query_visitor":
return await handle_query_visitor(arguments)
elif name == "register_visitor":
return await handle_register_visitor(arguments)
elif name == "control_door":
return await handle_control_door(arguments)
elif name == "get_door_status":
return await handle_get_door_status(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
async def handle_query_visitor(arguments: Dict[str, Any]) -> CallToolResult:
"""Handle visitor reservation query request"""
user_id = arguments.get("user_id")
if not user_id:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Missing required user_id parameter"
)
]
)
# Call visitor service query
result = await visitor_service.query_visitor(user_id)
if result["success"]:
visitor_data = result["data"]
# Return structured JSON data for AI processing
if visitor_data:
response_data = {
"success": True,
"user_id": user_id,
"visitor_data": visitor_data,
"message": "Query successful"
}
else:
response_data = {
"success": True,
"user_id": user_id,
"visitor_data": None,
"message": "No visitor data available"
}
# Convert to JSON string for AI consumption
import json
response_text = json.dumps(response_data, ensure_ascii=False, indent=2)
else:
response_text = f" Query failed: {result['message']}"
return CallToolResult(
content=[
TextContent(
type="text",
text=response_text
)
]
)
async def handle_register_visitor(arguments: Dict[str, Any]) -> CallToolResult:
"""Handle visitor registration request"""
visitor_name = arguments.get("visitor_name")
reason = arguments.get("reason")
user_id = arguments.get("user_id")
if not visitor_name or not reason or not user_id:
return CallToolResult(
content=[
TextContent(
type="text",
text="Error: Missing required parameters (visitor_name, reason, user_id)"
)
]
)
try:
# Create visitor information object
visitor_info = VisitorInfo(
employeeId=Config.EMPLOYEE_ID,
visitorName=visitor_name,
reason=reason,
userId=user_id
)
# Call visitor service registration
result = await visitor_service.register_visitor(visitor_info)
if result["success"]:
response_text = f" Visitor registration successful\n\n• Message: {result.get('message', 'N/A')}\n• Review Status: {result.get('status', 'N/A')}"
else:
response_text = f" Visitor registration failed: {result['message']}"
return CallToolResult(
content=[
TextContent(
type="text",
text=response_text
)
]
)
except ValueError as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f" Data validation failed: {str(e)}"
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f" System error: {str(e)}"
)
]
)
async def handle_control_door(arguments: Dict[str, Any]) -> CallToolResult:
"""Handle door control request"""
action = arguments.get("action")
entity_id = arguments.get("entity_id")
if not action:
return CallToolResult(
content=[
TextContent(
type="text",
text=" Error: Missing required 'action' parameter. Must be 'open' or 'close'"
)
]
)
try:
# Call door control service
result = await door_control_service.control_door(
action=action,
entity_id=entity_id
)
if result["success"]:
response_text = f" {result['message']}\n\nOperation Details:\n• Action: {result['action']}\n• Entity ID: {result['entity_id']}"
if result.get('response'):
response_text += f"\n• Response: {result['response']}"
else:
response_text = f" {result['message']}"
return CallToolResult(
content=[
TextContent(
type="text",
text=response_text
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f" Door control operation failed: {str(e)}"
)
]
)
async def handle_get_door_status(arguments: Dict[str, Any]) -> CallToolResult:
"""Handle door status query request"""
entity_id = arguments.get("entity_id")
try:
# Call door control service to get status
result = await door_control_service.get_door_status(entity_id=entity_id)
if result["success"]:
response_text = f" Door status query successful\n\nStatus Information:\n• Device Name: {result['device_name']}\n• Current Status: {result['current_state']}\n• Last Updated: {result.get('last_updated', 'N/A')}"
# Add device type if available
if result.get("device_type"):
response_text += f"\n• Device Type: {result['device_type']}"
else:
response_text = f" {result['message']}"
return CallToolResult(
content=[
TextContent(
type="text",
text=response_text
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f" Door status query failed: {str(e)}"
)
]
)

View File

@@ -0,0 +1,260 @@
import httpx
import logging
from typing import Dict, Optional, Any
from pydantic import BaseModel, Field, validator
from .config import Config
# Configure logging
logging.basicConfig(level=getattr(logging, Config.LOG_LEVEL))
logger = logging.getLogger(__name__)
class VisitorInfo(BaseModel):
"""Visitor information model"""
employeeId: str = Field(..., description="Employee ID", max_length=Config.MAX_EMPLOYEE_ID_LENGTH)
visitorName: str = Field(..., description="Visitor name", max_length=Config.MAX_NAME_LENGTH)
reason: str = Field(..., description="Visit reason", max_length=Config.MAX_REASON_LENGTH)
userId: str = Field(..., description="User ID", max_length=Config.MAX_USER_ID_LENGTH)
@validator('employeeId')
def validate_employee_id(cls, v):
if not v.strip():
raise ValueError("Employee ID cannot be empty")
return v.strip()
@validator('visitorName')
def validate_visitor_name(cls, v):
if not v.strip():
raise ValueError("Visitor name cannot be empty")
return v.strip()
@validator('reason')
def validate_reason(cls, v):
if not v.strip():
raise ValueError("Visit reason cannot be empty")
return v.strip()
@validator('userId')
def validate_user_id(cls, v):
if not v.strip():
raise ValueError("User ID cannot be empty")
return v.strip()
class VisitorService:
"""Visitor service class"""
def __init__(self):
self.base_url = Config.BASE_URL
self.timeout = Config.VISITOR_API_TIMEOUT
self.headers = Config.DEFAULT_HEADERS
async def query_visitor(self, user_id: str) -> Dict[str, Any]:
"""
Query visitor reservation records by user ID
Args:
user_id: User ID
Returns:
Visitor reservation records dictionary
"""
try:
url = f"{self.base_url}{Config.VISITOR_QUERY_ENDPOINT}"
params = {"userId": user_id}
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(url, params=params, headers=self.headers)
if response.status_code == 200:
response_data = response.json()
# Extract and filter relevant visitor information
if response_data.get("code") == 200 and "data" in response_data:
visitor_data = response_data["data"]
# Status mapping
status_map = {
0: "Under Review",
1: "Approved",
2: "Rejected",
3: "Unknown"
}
# Gender mapping
gender_map = {
"1": "Female",
"2": "Male",
"0": "Unknown"
}
# Filter only essential information for terminal display
filtered_data = {
"visitorName": visitor_data.get("visitorName"),
"visitTime": visitor_data.get("visitTime"),
"expireTime": visitor_data.get("expireTime"),
"receptionLocation": visitor_data.get("receptionLocation"),
"status": visitor_data.get("status"),
"visitorSex": visitor_data.get("visitorSex")
}
# Add text mappings for AI processing
status_value = filtered_data.get("status")
gender_value = str(filtered_data.get("visitorSex", ""))
enhanced_data = {
**filtered_data,
"status_text": status_map.get(status_value, f"Unknown Status({status_value})"),
"gender_text": gender_map.get(gender_value, f"Unknown({gender_value})")
}
logger.info(f"Successfully queried visitor reservation records: {user_id}")
return {
"success": True,
"data": enhanced_data,
"message": "Query successful"
}
else:
# Handle API error response
error_message = response_data.get("msg", "Unknown error")
logger.warning(f"API returned error: {error_message}")
return {
"success": False,
"data": None,
"message": error_message
}
else:
# Handle error response with specific format
try:
error_data = response.json()
error_message = error_data.get("msg", "Unknown error")
error_code = error_data.get("code", response.status_code)
logger.warning(f"API error response: {error_code} - {error_message}")
return {
"success": False,
"data": None,
"message": error_message
}
except:
# Fallback for non-JSON error responses
logger.error(f"Failed to query reservation records: {response.status_code} - {response.text}")
return {
"success": False,
"data": None,
"message": f"Query failed: HTTP {response.status_code}"
}
except httpx.TimeoutException:
logger.error(f"Query reservation records timeout: {user_id}")
return {
"success": False,
"data": None,
"message": "Request timeout, please try again later"
}
except httpx.RequestError as e:
logger.error(f"Query reservation records network error: {e}")
return {
"success": False,
"data": None,
"message": f"Network error: {str(e)}"
}
except Exception as e:
logger.error(f"Query reservation records exception: {e}")
return {
"success": False,
"data": None,
"message": f"System error: {str(e)}"
}
async def register_visitor(self, visitor_info: VisitorInfo) -> Dict[str, Any]:
"""
Register visitor information
Args:
visitor_info: Visitor information object
Returns:
Registration result dictionary
"""
try:
url = f"{self.base_url}{Config.VISITOR_REGISTER_ENDPOINT}"
payload = visitor_info.dict()
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(url, params=payload, headers=self.headers)
if response.status_code == 200:
result_data = response.json()
api_code = result_data.get("code")
api_message = result_data.get("msg", "")
status_map = {
0: "Under Review",
1: "Approved",
2: "Rejected",
3: "Unknown"
}
if api_code == 200:
# Registration successful
api_data = result_data.get("data")
status_text = status_map.get(api_data, f"Unknown Status({api_data})") if api_data is not None else "Unknown Status"
logger.info(f"Successfully registered visitor: {visitor_info.visitorName}")
return {
"success": True,
"data": result_data,
"message": api_message,
"status": status_text
}
elif api_code == 2:
# Duplicate registration
logger.warning(f"Visitor already registered: {visitor_info.visitorName}")
return {
"success": False,
"data": None,
"message": api_message
}
elif api_code == 1:
# Missing visitor name
logger.warning(f"Missing visitor name: {api_message}")
return {
"success": False,
"data": None,
"message": api_message
}
else:
# Other business errors
logger.warning(f"API business error for visitor {visitor_info.visitorName}: code={api_code}, message={api_message}")
return {
"success": False,
"data": None,
"message": api_message
}
else:
# HTTP error
logger.error(f"Visitor registration HTTP error: {response.status_code} - {response.text}")
return {
"success": False,
"data": None,
"message": f"HTTP error: {response.status_code}"
}
except httpx.TimeoutException:
logger.error(f"Visitor registration timeout: {visitor_info.visitorName}")
return {
"success": False,
"data": None,
"message": "Request timeout, please try again later"
}
except httpx.RequestError as e:
logger.error(f"Visitor registration network error: {e}")
return {
"success": False,
"data": None,
"message": f"Network error: {str(e)}"
}
except Exception as e:
logger.error(f"Visitor registration exception: {e}")
return {
"success": False,
"data": None,
"message": f"System error: {str(e)}"
}