Files
lzwcai-mcp/file_tools/file_tools/tools.py
Sucan 135c8e379e feat: 添加资产变动确认卡片和Excel图片解析功能
- 新增 `send_asset_confirmation_card` 工具,用于发送资产变动确认卡片
- 新增 `upload_image_by_excel` 工具,支持从Excel中根据image_key定位并上传图片
- 在file-tools中添加 `excel_image_key_to_temp_file` 和 `upload_file_to_minio` 工具
- 新增配置文件管理和MinIO集成支持
- 更新项目依赖版本,添加openpyxl和minio库
2026-02-13 19:57:02 +08:00

527 lines
20 KiB
Python

import asyncio
import base64
import hashlib
import json
import mimetypes
import re
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
from typing import Any, Dict, List, Optional, Tuple
from xml.etree import ElementTree as ET
import httpx
from minio import Minio
from mcp.types import TextContent, Tool
from .config import Config
tools: List[Tool] = [
Tool(
name="file_to_json",
description="将文件对象转储为 JSON 字符串",
inputSchema={
"type": "object",
"properties": {
"file": {
"type": "object",
"properties": {
"name": {"type": "string"},
"type": {"type": "string"},
"size": {"type": "integer"},
"last_modified": {"type": "integer"},
"content_base64": {"type": "string"}
},
"required": ["name"]
},
"file_path": {"type": "string"}
},
"required": []
}
),
Tool(
name="file_to_data_uri",
description="将文件转换为 data URI(Base64 编码)",
inputSchema={
"type": "object",
"properties": {
"file": {
"type": "object",
"properties": {
"name": {"type": "string"},
"type": {"type": "string"},
"size": {"type": "integer"},
"last_modified": {"type": "integer"},
"content_base64": {"type": "string"}
},
"required": ["name", "content_base64"]
},
"type": {"type": "string"},
"file_path": {"type": "string"}
},
"required": []
}
),
Tool(
name="file_path_to_data_uri",
description="将文件路径转换为 data URI(Base64 编码)",
inputSchema={
"type": "object",
"properties": {
"file_path": {"type": "string"},
"type": {"type": "string"}
},
"required": ["file_path"]
}
),
Tool(
name="url_to_data_uri",
description="将文件 URL 转换为 data URI(Base64 编码)",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"}
},
"required": ["url"]
}
),
Tool(
name="url_to_temp_file",
description="将文件 URL 下载为临时文件并返回路径",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"file_path": {"type": "string"}
},
"required": ["url"]
}
),
Tool(
name="excel_image_key_to_temp_file",
description="根据Excel内image_key定位图片并转为临时文件路径",
inputSchema={
"type": "object",
"properties": {
"excel_path": {"type": "string"},
"image_key": {"type": "string"}
},
"required": ["excel_path", "image_key"]
}
),
Tool(
name="upload_file_to_minio",
description="上传本地文件到MinIO并返回URL",
inputSchema={
"type": "object",
"properties": {
"file_path": {"type": "string"}
},
"required": ["file_path"]
}
)
]
def _guess_mime_type(name: Optional[str]) -> str:
if not name:
return "application/octet-stream"
mime_type, _ = mimetypes.guess_type(name)
return mime_type or "application/octet-stream"
def _is_url(value: Optional[str]) -> bool:
if not value:
return False
parsed = urlparse(value)
return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
def _decode_base64(content_base64: str) -> bytes:
try:
return base64.b64decode(content_base64, validate=True)
except Exception:
return base64.b64decode(content_base64)
def _build_data_uri(data: bytes, mime_type: str) -> str:
encoded = base64.b64encode(data).decode("ascii")
return f"data:{mime_type};base64,{encoded}"
def _extract_file_payload(arguments: Dict[str, Any]) -> Tuple[Dict[str, Any], Optional[bytes], Optional[str]]:
file_payload = arguments.get("file")
file_path = arguments.get("file_path")
if file_payload is None and file_path is None:
raise ValueError("missing file or file_path")
name = None
mime_type = None
size = None
last_modified = None
content_base64 = None
data = None
if file_payload is not None:
name = file_payload.get("name")
mime_type = file_payload.get("type")
size = file_payload.get("size")
last_modified = file_payload.get("last_modified")
content_base64 = file_payload.get("content_base64")
if file_path:
path = Path(file_path)
if name is None:
name = path.name
if content_base64 is None:
data = path.read_bytes()
if data is None and content_base64 is not None:
data = _decode_base64(content_base64)
return (
{
"name": name,
"type": mime_type,
"size": size,
"last_modified": last_modified,
"content_base64": content_base64
},
data,
name
)
def _normalize_mime_type(mime_type: Optional[str], name: Optional[str]) -> str:
if mime_type:
return mime_type
return _guess_mime_type(name)
def _normalize_header(value: Any) -> str:
if value is None:
return ""
return str(value).strip().lower()
def _extract_dispimg_id(value: Any) -> str:
if value is None:
return ""
text = str(value).strip()
if not text:
return ""
match = re.search(r"dispimg\(\s*\"([^\"]+)\"", text, re.IGNORECASE)
if match:
return match.group(1).strip()
match = re.search(r"dispimg\(\s*'([^']+)'", text, re.IGNORECASE)
if match:
return match.group(1).strip()
return text
def _resolve_dispimg_temp_file(excel_path: str, image_key: str) -> str:
image_id = _extract_dispimg_id(image_key)
if not image_id:
raise ValueError("missing dispimg id")
with zipfile.ZipFile(excel_path) as zip_ref:
names = set(zip_ref.namelist())
if "xl/cellimages.xml" not in names:
raise ValueError("cellimages.xml not found in excel")
if "xl/_rels/cellimages.xml.rels" not in names:
raise ValueError("cellimages.xml.rels not found in excel")
rels_root = ET.fromstring(zip_ref.read("xl/_rels/cellimages.xml.rels"))
rel_ns = "http://schemas.openxmlformats.org/package/2006/relationships"
rels_map: Dict[str, str] = {}
for rel in rels_root.findall(f"{{{rel_ns}}}Relationship"):
rel_id = rel.attrib.get("Id")
target = rel.attrib.get("Target")
if rel_id and target:
rels_map[rel_id] = target
cell_root = ET.fromstring(zip_ref.read("xl/cellimages.xml"))
namespaces = {
"etc": "http://www.wps.cn/officeDocument/2017/etCustomData",
"xdr": "http://schemas.openxmlformats.org/drawingml/2006/spreadsheetDrawing",
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
}
name_to_embed: Dict[str, str] = {}
for cell_image in cell_root.findall("etc:cellImage", namespaces):
c_nv_pr = cell_image.find(".//xdr:cNvPr", namespaces)
blip = cell_image.find(".//a:blip", namespaces)
name = c_nv_pr.attrib.get("name") if c_nv_pr is not None else ""
embed_id = blip.attrib.get(f"{{{namespaces['r']}}}embed") if blip is not None else ""
if name and embed_id:
name_to_embed[name] = embed_id
if not name_to_embed:
raise ValueError("no cell images found in excel")
candidates = [image_id]
if image_id.startswith("ID_"):
candidates.append(image_id[3:])
else:
candidates.append(f"ID_{image_id}")
name_to_embed_lower = {key.lower(): value for key, value in name_to_embed.items()}
embed_id = ""
for candidate in candidates:
if candidate in name_to_embed:
embed_id = name_to_embed[candidate]
break
lower_candidate = candidate.lower()
if lower_candidate in name_to_embed_lower:
embed_id = name_to_embed_lower[lower_candidate]
break
if not embed_id:
raise ValueError("dispimg id not found in excel")
target = rels_map.get(embed_id)
if not target:
raise ValueError("dispimg image target not found in excel")
target_path = f"xl/{target.lstrip('/')}"
if target_path not in names:
raise ValueError("dispimg image file missing in excel")
suffix = Path(target).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
temp_path = Path(temp_file.name)
temp_path.write_bytes(zip_ref.read(target_path))
return str(temp_path)
def _resolve_image_source_from_excel(excel_path: str, image_key: str) -> str:
from openpyxl import load_workbook
workbook = load_workbook(excel_path, read_only=True, data_only=True)
try:
worksheet = workbook.worksheets[0]
header_row = next(worksheet.iter_rows(min_row=1, max_row=1, values_only=True), None)
if not header_row:
raise ValueError("excel header row is empty")
header_map: Dict[str, int] = {}
for index, header in enumerate(header_row):
name = _normalize_header(header)
if name:
header_map[name] = index
key_col_index = None
for key_name in ("image_key", "imagekey", "key"):
if key_name in header_map:
key_col_index = header_map[key_name]
break
if key_col_index is None:
raise ValueError("missing image_key column in excel")
candidate_columns = ["image_path", "image_url", "image", "url", "file_path", "path", "image_file"]
candidate_indices = [header_map[name] for name in candidate_columns if name in header_map]
matched_source = ""
for row in worksheet.iter_rows(min_row=2, values_only=True):
if key_col_index >= len(row):
continue
cell_value = row[key_col_index]
if cell_value is None:
continue
if str(cell_value).strip() != image_key:
continue
for index in candidate_indices:
if index < len(row):
candidate_value = row[index]
if candidate_value is not None and str(candidate_value).strip():
matched_source = str(candidate_value).strip()
break
if not matched_source:
matched_source = str(cell_value).strip()
break
if not matched_source:
raise ValueError(f"missing image source for image_key: {image_key}")
if not _is_url(matched_source):
if not Path(matched_source).is_absolute():
matched_source = str(Path(excel_path).parent / matched_source)
return matched_source
finally:
workbook.close()
def _build_local_temp_file(file_path: str) -> str:
source_path = Path(file_path)
if not source_path.is_file():
raise FileNotFoundError(f"image file not found: {file_path}")
suffix = source_path.suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
temp_path = Path(temp_file.name)
temp_path.write_bytes(source_path.read_bytes())
return str(temp_path)
def _normalize_minio_endpoint(endpoint: str) -> Tuple[str, Optional[bool], str]:
raw = str(endpoint).strip()
if not raw:
return "", None, ""
if raw.startswith("http://") or raw.startswith("https://"):
parsed = urlparse(raw)
secure = parsed.scheme == "https"
return parsed.netloc, secure, f"{parsed.scheme}://{parsed.netloc}"
return raw, None, f"http://{raw}"
def _hash_file_md5(file_path: str) -> str:
hasher = hashlib.md5()
with open(file_path, "rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
hasher.update(chunk)
return hasher.hexdigest()
def _upload_file_to_minio_sync(file_path: str) -> str:
if not file_path:
raise ValueError("missing file_path")
source_path = Path(file_path)
if not source_path.is_file():
raise FileNotFoundError(f"file_path not found: {file_path}")
endpoint_raw = Config.MINIO_ENDPOINT
access_key = Config.MINIO_ACCESS_KEY
secret_key = Config.MINIO_SECRET_KEY
if not endpoint_raw:
raise ValueError("missing minio_endpoint")
if not access_key:
raise ValueError("missing minio_access_key")
if not secret_key:
raise ValueError("missing minio_secret_key")
endpoint, endpoint_secure, endpoint_base = _normalize_minio_endpoint(endpoint_raw)
if not endpoint:
raise ValueError("invalid minio_endpoint")
secure = endpoint_secure or False
client = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=secure)
bucket = "lzwcai"
prefix = "tmp"
if not client.bucket_exists(bucket):
client.make_bucket(bucket)
date_str = datetime.now().strftime("%Y-%m-%d")
file_hash = _hash_file_md5(file_path)
suffix = source_path.suffix
object_name = f"{prefix}/{date_str}/{file_hash}/{file_hash}{suffix}"
content_type = _guess_mime_type(source_path.name)
client.fput_object(bucket, object_name, file_path, content_type=content_type)
scheme = "https" if secure else "http"
public_base = endpoint_base if endpoint_base else f"{scheme}://{endpoint}"
public_base = f"{public_base.rstrip('/')}/{bucket}"
return f"{public_base}/{object_name}"
def _build_file_json(arguments: Dict[str, Any]) -> str:
file_info, data, name = _extract_file_payload(arguments)
if file_info["type"] is None:
file_info["type"] = _guess_mime_type(name)
if file_info["size"] is None and data is not None:
file_info["size"] = len(data)
return json.dumps(file_info, ensure_ascii=False)
def _build_file_data_uri(arguments: Dict[str, Any]) -> str:
file_info, data, name = _extract_file_payload(arguments)
if data is None:
raise ValueError("missing file content for data uri")
mime_type = arguments.get("type") or file_info.get("type")
mime_type = _normalize_mime_type(mime_type, name)
return _build_data_uri(data, mime_type)
def _build_path_data_uri(arguments: Dict[str, Any]) -> str:
file_path = arguments.get("file_path")
if not file_path:
raise ValueError("missing file_path")
path = Path(file_path)
data = path.read_bytes()
mime_type = arguments.get("type") or _guess_mime_type(path.name)
return _build_data_uri(data, mime_type)
async def _build_url_data_uri(arguments: Dict[str, Any]) -> str:
url = arguments.get("url")
if not url:
raise ValueError("missing url")
async with httpx.AsyncClient(follow_redirects=True, timeout=20) as client:
response = await client.get(url)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
mime_type = content_type.split(";")[0].strip() if content_type else ""
if not mime_type:
mime_type = _guess_mime_type(url)
return _build_data_uri(response.content, mime_type)
async def _build_url_file_path(arguments: Dict[str, Any]) -> str:
url = arguments.get("url")
if not url:
raise ValueError("missing url")
file_path = arguments.get("file_path")
if file_path:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
else:
suffix = Path(urlparse(url).path).suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
path = Path(temp_file.name)
async with httpx.AsyncClient(follow_redirects=True, timeout=20) as client:
response = await client.get(url)
response.raise_for_status()
path.write_bytes(response.content)
return str(path)
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
try:
if name == "file_to_json":
result = _build_file_json(arguments)
elif name == "file_to_data_uri":
file_path = arguments.get("file_path")
if _is_url(file_path):
result = await _build_url_data_uri({"url": file_path})
else:
result = _build_file_data_uri(arguments)
elif name == "file_path_to_data_uri":
file_path = arguments.get("file_path")
if _is_url(file_path):
result = await _build_url_data_uri({"url": file_path})
else:
result = _build_path_data_uri(arguments)
elif name == "url_to_data_uri":
result = await _build_url_data_uri(arguments)
elif name == "url_to_temp_file":
result = await _build_url_file_path(arguments)
elif name == "excel_image_key_to_temp_file":
excel_path = arguments.get("excel_path")
image_key = arguments.get("image_key")
if not excel_path:
raise ValueError("missing excel_path")
if not image_key:
raise ValueError("missing image_key")
excel_path = str(excel_path).strip()
image_key = str(image_key).strip()
path = Path(excel_path)
if not path.exists():
raise FileNotFoundError(f"excel_path not found: {excel_path}")
if path.suffix.lower() in (".xls",):
raise ValueError("xls is not supported, please convert to xlsx")
if path.suffix.lower() not in (".xlsx", ".xlsm", ".xltx", ".xltm"):
raise ValueError("excel_path must be xlsx format")
dispimg_error: Optional[Exception] = None
table_error: Optional[Exception] = None
try:
result = _resolve_dispimg_temp_file(excel_path, image_key)
except Exception as exc:
dispimg_error = exc
try:
image_source = _resolve_image_source_from_excel(excel_path, image_key)
if _is_url(image_source):
result = await _build_url_file_path({"url": image_source})
else:
result = _build_local_temp_file(image_source)
except Exception as exc2:
table_error = exc2
raise ValueError(f"excel image not found: dispimg={dispimg_error}; table={table_error}")
elif name == "upload_file_to_minio":
file_path = str(arguments.get("file_path", "")).strip()
if not file_path:
raise ValueError("missing file_path")
result = await asyncio.to_thread(_upload_file_to_minio_sync, file_path)
else:
raise ValueError(f"unknown tool name: {name}")
return [TextContent(type="text", text=result)]
except Exception as exc:
return [TextContent(type="text", text=f"Failed to call tool {name}: {exc}")]