feat: 新增飞书和文件工具MCP服务并优化机器人配置

- 新增飞书MCP服务,包含图片上传和消息卡片发送功能
- 新增文件工具MCP服务,提供文件转JSON和data URI功能
- 更新机器人MCP服务配置,移除环境变量硬编码
- 升级机器人版本至0.1.28并优化迎宾功能参数
- 添加.gitignore和项目配置文件
This commit is contained in:
2026-02-11 18:57:31 +08:00
parent ff39bdbd8a
commit 0a308726a6
20 changed files with 1005 additions and 22 deletions

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,47 @@
import asyncio
import logging
from mcp.server import Server
from mcp.server.stdio import stdio_server
from .tools import handle_call_tool, tools
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class FileToolsServer:
def __init__(self) -> None:
self.server: Server = Server("lzwcai-mcpskills-file-tools-mcp")
self._register_handlers()
def _register_handlers(self) -> None:
@self.server.list_tools()
async def list_tools():
return tools
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
logger.info("Received tool call request: %s", name)
return await handle_call_tool(name, arguments)
async def _main() -> None:
server = FileToolsServer()
async with stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
server.server.create_initialization_options()
)
def main() -> None:
asyncio.run(_main())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,256 @@
import base64
import json
import mimetypes
import tempfile
from pathlib import Path
from urllib.parse import urlparse
from typing import Any, Dict, List, Optional, Tuple
import httpx
from mcp.types import TextContent, Tool
tools: List[Tool] = [
Tool(
name="file_to_json",
description="将文件对象转储为 JSON 字符串",
inputSchema={
"type": "object",
"properties": {
"file": {
"type": "object",
"properties": {
"name": {"type": "string"},
"type": {"type": "string"},
"size": {"type": "integer"},
"last_modified": {"type": "integer"},
"content_base64": {"type": "string"}
},
"required": ["name"]
},
"file_path": {"type": "string"}
},
"required": []
}
),
Tool(
name="file_to_data_uri",
description="将文件转换为 data URI(Base64 编码)",
inputSchema={
"type": "object",
"properties": {
"file": {
"type": "object",
"properties": {
"name": {"type": "string"},
"type": {"type": "string"},
"size": {"type": "integer"},
"last_modified": {"type": "integer"},
"content_base64": {"type": "string"}
},
"required": ["name", "content_base64"]
},
"type": {"type": "string"},
"file_path": {"type": "string"}
},
"required": []
}
),
Tool(
name="file_path_to_data_uri",
description="将文件路径转换为 data URI(Base64 编码)",
inputSchema={
"type": "object",
"properties": {
"file_path": {"type": "string"},
"type": {"type": "string"}
},
"required": ["file_path"]
}
),
Tool(
name="url_to_data_uri",
description="将文件 URL 转换为 data URI(Base64 编码)",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"}
},
"required": ["url"]
}
),
Tool(
name="url_to_temp_file",
description="将文件 URL 下载为临时文件并返回路径",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"file_path": {"type": "string"}
},
"required": ["url"]
}
)
]
def _guess_mime_type(name: Optional[str]) -> str:
if not name:
return "application/octet-stream"
mime_type, _ = mimetypes.guess_type(name)
return mime_type or "application/octet-stream"
def _is_url(value: Optional[str]) -> bool:
if not value:
return False
parsed = urlparse(value)
return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
def _decode_base64(content_base64: str) -> bytes:
try:
return base64.b64decode(content_base64, validate=True)
except Exception:
return base64.b64decode(content_base64)
def _build_data_uri(data: bytes, mime_type: str) -> str:
encoded = base64.b64encode(data).decode("ascii")
return f"data:{mime_type};base64,{encoded}"
def _extract_file_payload(arguments: Dict[str, Any]) -> Tuple[Dict[str, Any], Optional[bytes], Optional[str]]:
file_payload = arguments.get("file")
file_path = arguments.get("file_path")
if file_payload is None and file_path is None:
raise ValueError("missing file or file_path")
name = None
mime_type = None
size = None
last_modified = None
content_base64 = None
data = None
if file_payload is not None:
name = file_payload.get("name")
mime_type = file_payload.get("type")
size = file_payload.get("size")
last_modified = file_payload.get("last_modified")
content_base64 = file_payload.get("content_base64")
if file_path:
path = Path(file_path)
if name is None:
name = path.name
if content_base64 is None:
data = path.read_bytes()
if data is None and content_base64 is not None:
data = _decode_base64(content_base64)
return (
{
"name": name,
"type": mime_type,
"size": size,
"last_modified": last_modified,
"content_base64": content_base64
},
data,
name
)
def _normalize_mime_type(mime_type: Optional[str], name: Optional[str]) -> str:
if mime_type:
return mime_type
return _guess_mime_type(name)
def _build_file_json(arguments: Dict[str, Any]) -> str:
file_info, data, name = _extract_file_payload(arguments)
if file_info["type"] is None:
file_info["type"] = _guess_mime_type(name)
if file_info["size"] is None and data is not None:
file_info["size"] = len(data)
return json.dumps(file_info, ensure_ascii=False)
def _build_file_data_uri(arguments: Dict[str, Any]) -> str:
file_info, data, name = _extract_file_payload(arguments)
if data is None:
raise ValueError("missing file content for data uri")
mime_type = arguments.get("type") or file_info.get("type")
mime_type = _normalize_mime_type(mime_type, name)
return _build_data_uri(data, mime_type)
def _build_path_data_uri(arguments: Dict[str, Any]) -> str:
file_path = arguments.get("file_path")
if not file_path:
raise ValueError("missing file_path")
path = Path(file_path)
data = path.read_bytes()
mime_type = arguments.get("type") or _guess_mime_type(path.name)
return _build_data_uri(data, mime_type)
async def _build_url_data_uri(arguments: Dict[str, Any]) -> str:
url = arguments.get("url")
if not url:
raise ValueError("missing url")
async with httpx.AsyncClient(follow_redirects=True, timeout=20) as client:
response = await client.get(url)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
mime_type = content_type.split(";")[0].strip() if content_type else ""
if not mime_type:
mime_type = _guess_mime_type(url)
return _build_data_uri(response.content, mime_type)
async def _build_url_file_path(arguments: Dict[str, Any]) -> str:
url = arguments.get("url")
if not url:
raise ValueError("missing url")
file_path = arguments.get("file_path")
if file_path:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
else:
suffix = Path(urlparse(url).path).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
path = Path(temp_file.name)
async with httpx.AsyncClient(follow_redirects=True, timeout=20) as client:
response = await client.get(url)
response.raise_for_status()
path.write_bytes(response.content)
return str(path)
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
try:
if name == "file_to_json":
result = _build_file_json(arguments)
elif name == "file_to_data_uri":
file_path = arguments.get("file_path")
if _is_url(file_path):
result = await _build_url_data_uri({"url": file_path})
else:
result = _build_file_data_uri(arguments)
elif name == "file_path_to_data_uri":
file_path = arguments.get("file_path")
if _is_url(file_path):
result = await _build_url_data_uri({"url": file_path})
else:
result = _build_path_data_uri(arguments)
elif name == "url_to_data_uri":
result = await _build_url_data_uri(arguments)
elif name == "url_to_temp_file":
result = await _build_url_file_path(arguments)
else:
raise ValueError(f"unknown tool name: {name}")
return [TextContent(type="text", text=result)]
except Exception as exc:
return [TextContent(type="text", text=f"Failed to call tool {name}: {exc}")]