diff --git a/file_tools/file_tools/config.py b/file_tools/file_tools/config.py
new file mode 100644
index 0000000..c55a1f6
--- /dev/null
+++ b/file_tools/file_tools/config.py
@@ -0,0 +1,29 @@
+import json
+import os
+from pathlib import Path
+
+
+def _load_env_from_config() -> None:
+ config_path = Path(__file__).resolve().parent.parent / "mcp-server-file-tools.json"
+ if not config_path.exists():
+ return
+ config_data = json.loads(config_path.read_text(encoding="utf-8"))
+ env_data = (
+ config_data.get("mcpServers", {})
+ .get("lzwcai-mcpskills-file-tools-mcp", {})
+ .get("env", {})
+ )
+ for key, value in env_data.items():
+ if value is None:
+ continue
+ if os.getenv(key) is None:
+ os.environ[key] = str(value)
+
+
+_load_env_from_config()
+
+
+class Config:
+ MINIO_ENDPOINT = os.getenv("minio_endpoint", "")
+ MINIO_ACCESS_KEY = os.getenv("minio_access_key", "")
+ MINIO_SECRET_KEY = os.getenv("minio_secret_key", "")
diff --git a/file_tools/file_tools/tools.py b/file_tools/file_tools/tools.py
index aa63b2d..8030152 100644
--- a/file_tools/file_tools/tools.py
+++ b/file_tools/file_tools/tools.py
@@ -1,14 +1,22 @@
+import asyncio
import base64
+import hashlib
import json
import mimetypes
+import re
import tempfile
+import zipfile
+from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
from typing import Any, Dict, List, Optional, Tuple
+from xml.etree import ElementTree as ET
import httpx
+from minio import Minio
from mcp.types import TextContent, Tool
+from .config import Config
tools: List[Tool] = [
Tool(
name="file_to_json",
@@ -89,6 +97,29 @@ tools: List[Tool] = [
},
"required": ["url"]
}
+ ),
+ Tool(
+ name="excel_image_key_to_temp_file",
+ description="根据Excel内image_key定位图片并转为临时文件路径",
+ inputSchema={
+ "type": "object",
+ "properties": {
+ "excel_path": {"type": "string"},
+ "image_key": {"type": "string"}
+ },
+ "required": ["excel_path", "image_key"]
+ }
+ ),
+ Tool(
+ name="upload_file_to_minio",
+ description="上传本地文件到MinIO并返回URL",
+ inputSchema={
+ "type": "object",
+ "properties": {
+ "file_path": {"type": "string"}
+ },
+ "required": ["file_path"]
+ }
)
]
@@ -168,6 +199,209 @@ def _normalize_mime_type(mime_type: Optional[str], name: Optional[str]) -> str:
return _guess_mime_type(name)
+def _normalize_header(value: Any) -> str:
+ if value is None:
+ return ""
+ return str(value).strip().lower()
+
+
+def _extract_dispimg_id(value: Any) -> str:
+ if value is None:
+ return ""
+ text = str(value).strip()
+ if not text:
+ return ""
+ match = re.search(r"dispimg\(\s*\"([^\"]+)\"", text, re.IGNORECASE)
+ if match:
+ return match.group(1).strip()
+ match = re.search(r"dispimg\(\s*'([^']+)'", text, re.IGNORECASE)
+ if match:
+ return match.group(1).strip()
+ return text
+
+
+def _resolve_dispimg_temp_file(excel_path: str, image_key: str) -> str:
+ image_id = _extract_dispimg_id(image_key)
+ if not image_id:
+ raise ValueError("missing dispimg id")
+ with zipfile.ZipFile(excel_path) as zip_ref:
+ names = set(zip_ref.namelist())
+ if "xl/cellimages.xml" not in names:
+ raise ValueError("cellimages.xml not found in excel")
+ if "xl/_rels/cellimages.xml.rels" not in names:
+ raise ValueError("cellimages.xml.rels not found in excel")
+ rels_root = ET.fromstring(zip_ref.read("xl/_rels/cellimages.xml.rels"))
+ rel_ns = "http://schemas.openxmlformats.org/package/2006/relationships"
+ rels_map: Dict[str, str] = {}
+ for rel in rels_root.findall(f"{{{rel_ns}}}Relationship"):
+ rel_id = rel.attrib.get("Id")
+ target = rel.attrib.get("Target")
+ if rel_id and target:
+ rels_map[rel_id] = target
+ cell_root = ET.fromstring(zip_ref.read("xl/cellimages.xml"))
+ namespaces = {
+ "etc": "http://www.wps.cn/officeDocument/2017/etCustomData",
+ "xdr": "http://schemas.openxmlformats.org/drawingml/2006/spreadsheetDrawing",
+ "a": "http://schemas.openxmlformats.org/drawingml/2006/main",
+ "r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
+ }
+ name_to_embed: Dict[str, str] = {}
+ for cell_image in cell_root.findall("etc:cellImage", namespaces):
+ c_nv_pr = cell_image.find(".//xdr:cNvPr", namespaces)
+ blip = cell_image.find(".//a:blip", namespaces)
+ name = c_nv_pr.attrib.get("name") if c_nv_pr is not None else ""
+ embed_id = blip.attrib.get(f"{{{namespaces['r']}}}embed") if blip is not None else ""
+ if name and embed_id:
+ name_to_embed[name] = embed_id
+ if not name_to_embed:
+ raise ValueError("no cell images found in excel")
+ candidates = [image_id]
+ if image_id.startswith("ID_"):
+ candidates.append(image_id[3:])
+ else:
+ candidates.append(f"ID_{image_id}")
+ name_to_embed_lower = {key.lower(): value for key, value in name_to_embed.items()}
+ embed_id = ""
+ for candidate in candidates:
+ if candidate in name_to_embed:
+ embed_id = name_to_embed[candidate]
+ break
+ lower_candidate = candidate.lower()
+ if lower_candidate in name_to_embed_lower:
+ embed_id = name_to_embed_lower[lower_candidate]
+ break
+ if not embed_id:
+ raise ValueError("dispimg id not found in excel")
+ target = rels_map.get(embed_id)
+ if not target:
+ raise ValueError("dispimg image target not found in excel")
+ target_path = f"xl/{target.lstrip('/')}"
+ if target_path not in names:
+ raise ValueError("dispimg image file missing in excel")
+ suffix = Path(target).suffix
+ with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
+ temp_path = Path(temp_file.name)
+ temp_path.write_bytes(zip_ref.read(target_path))
+ return str(temp_path)
+
+
+def _resolve_image_source_from_excel(excel_path: str, image_key: str) -> str:
+ from openpyxl import load_workbook
+
+ workbook = load_workbook(excel_path, read_only=True, data_only=True)
+ try:
+ worksheet = workbook.worksheets[0]
+ header_row = next(worksheet.iter_rows(min_row=1, max_row=1, values_only=True), None)
+ if not header_row:
+ raise ValueError("excel header row is empty")
+ header_map: Dict[str, int] = {}
+ for index, header in enumerate(header_row):
+ name = _normalize_header(header)
+ if name:
+ header_map[name] = index
+ key_col_index = None
+ for key_name in ("image_key", "imagekey", "key"):
+ if key_name in header_map:
+ key_col_index = header_map[key_name]
+ break
+ if key_col_index is None:
+ raise ValueError("missing image_key column in excel")
+ candidate_columns = ["image_path", "image_url", "image", "url", "file_path", "path", "image_file"]
+ candidate_indices = [header_map[name] for name in candidate_columns if name in header_map]
+ matched_source = ""
+ for row in worksheet.iter_rows(min_row=2, values_only=True):
+ if key_col_index >= len(row):
+ continue
+ cell_value = row[key_col_index]
+ if cell_value is None:
+ continue
+ if str(cell_value).strip() != image_key:
+ continue
+ for index in candidate_indices:
+ if index < len(row):
+ candidate_value = row[index]
+ if candidate_value is not None and str(candidate_value).strip():
+ matched_source = str(candidate_value).strip()
+ break
+ if not matched_source:
+ matched_source = str(cell_value).strip()
+ break
+ if not matched_source:
+ raise ValueError(f"missing image source for image_key: {image_key}")
+ if not _is_url(matched_source):
+ if not Path(matched_source).is_absolute():
+ matched_source = str(Path(excel_path).parent / matched_source)
+ return matched_source
+ finally:
+ workbook.close()
+
+
+def _build_local_temp_file(file_path: str) -> str:
+ source_path = Path(file_path)
+ if not source_path.is_file():
+ raise FileNotFoundError(f"image file not found: {file_path}")
+ suffix = source_path.suffix
+ with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
+ temp_path = Path(temp_file.name)
+ temp_path.write_bytes(source_path.read_bytes())
+ return str(temp_path)
+
+
+def _normalize_minio_endpoint(endpoint: str) -> Tuple[str, Optional[bool], str]:
+ raw = str(endpoint).strip()
+ if not raw:
+ return "", None, ""
+ if raw.startswith("http://") or raw.startswith("https://"):
+ parsed = urlparse(raw)
+ secure = parsed.scheme == "https"
+ return parsed.netloc, secure, f"{parsed.scheme}://{parsed.netloc}"
+ return raw, None, f"http://{raw}"
+
+
+def _hash_file_md5(file_path: str) -> str:
+ hasher = hashlib.md5()
+ with open(file_path, "rb") as handle:
+ for chunk in iter(lambda: handle.read(1024 * 1024), b""):
+ hasher.update(chunk)
+ return hasher.hexdigest()
+
+
+def _upload_file_to_minio_sync(file_path: str) -> str:
+ if not file_path:
+ raise ValueError("missing file_path")
+ source_path = Path(file_path)
+ if not source_path.is_file():
+ raise FileNotFoundError(f"file_path not found: {file_path}")
+ endpoint_raw = Config.MINIO_ENDPOINT
+ access_key = Config.MINIO_ACCESS_KEY
+ secret_key = Config.MINIO_SECRET_KEY
+ if not endpoint_raw:
+ raise ValueError("missing minio_endpoint")
+ if not access_key:
+ raise ValueError("missing minio_access_key")
+ if not secret_key:
+ raise ValueError("missing minio_secret_key")
+ endpoint, endpoint_secure, endpoint_base = _normalize_minio_endpoint(endpoint_raw)
+ if not endpoint:
+ raise ValueError("invalid minio_endpoint")
+ secure = endpoint_secure or False
+ client = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=secure)
+ bucket = "lzwcai"
+ prefix = "tmp"
+ if not client.bucket_exists(bucket):
+ client.make_bucket(bucket)
+ date_str = datetime.now().strftime("%Y-%m-%d")
+ file_hash = _hash_file_md5(file_path)
+ suffix = source_path.suffix
+ object_name = f"{prefix}/{date_str}/{file_hash}/{file_hash}{suffix}"
+ content_type = _guess_mime_type(source_path.name)
+ client.fput_object(bucket, object_name, file_path, content_type=content_type)
+ scheme = "https" if secure else "http"
+ public_base = endpoint_base if endpoint_base else f"{scheme}://{endpoint}"
+ public_base = f"{public_base.rstrip('/')}/{bucket}"
+ return f"{public_base}/{object_name}"
+
+
def _build_file_json(arguments: Dict[str, Any]) -> str:
file_info, data, name = _extract_file_payload(arguments)
if file_info["type"] is None:
@@ -249,6 +483,42 @@ async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextCon
result = await _build_url_data_uri(arguments)
elif name == "url_to_temp_file":
result = await _build_url_file_path(arguments)
+ elif name == "excel_image_key_to_temp_file":
+ excel_path = arguments.get("excel_path")
+ image_key = arguments.get("image_key")
+ if not excel_path:
+ raise ValueError("missing excel_path")
+ if not image_key:
+ raise ValueError("missing image_key")
+ excel_path = str(excel_path).strip()
+ image_key = str(image_key).strip()
+ path = Path(excel_path)
+ if not path.exists():
+ raise FileNotFoundError(f"excel_path not found: {excel_path}")
+ if path.suffix.lower() in (".xls",):
+ raise ValueError("xls is not supported, please convert to xlsx")
+ if path.suffix.lower() not in (".xlsx", ".xlsm", ".xltx", ".xltm"):
+ raise ValueError("excel_path must be xlsx format")
+ dispimg_error: Optional[Exception] = None
+ table_error: Optional[Exception] = None
+ try:
+ result = _resolve_dispimg_temp_file(excel_path, image_key)
+ except Exception as exc:
+ dispimg_error = exc
+ try:
+ image_source = _resolve_image_source_from_excel(excel_path, image_key)
+ if _is_url(image_source):
+ result = await _build_url_file_path({"url": image_source})
+ else:
+ result = _build_local_temp_file(image_source)
+ except Exception as exc2:
+ table_error = exc2
+ raise ValueError(f"excel image not found: dispimg={dispimg_error}; table={table_error}")
+ elif name == "upload_file_to_minio":
+ file_path = str(arguments.get("file_path", "")).strip()
+ if not file_path:
+ raise ValueError("missing file_path")
+ result = await asyncio.to_thread(_upload_file_to_minio_sync, file_path)
else:
raise ValueError(f"unknown tool name: {name}")
return [TextContent(type="text", text=result)]
diff --git a/file_tools/mcp-server-file-tools.json b/file_tools/mcp-server-file-tools.json
index 0a49d92..f7fe1fb 100644
--- a/file_tools/mcp-server-file-tools.json
+++ b/file_tools/mcp-server-file-tools.json
@@ -8,7 +8,11 @@
"args": [
"lzwcai-mcpskills-file-tools-mcp"
],
- "env": {}
+ "env": {
+ "minio_endpoint": "http://sceneminios3.lzwcai.com:9000",
+ "minio_access_key": "TgPBBz0OdlvEVzG3",
+ "minio_secret_key": "AgpliEB6L7UWXXeBaAN0gL4xiRCGCE03"
+ }
}
}
}
diff --git a/file_tools/pyproject.toml b/file_tools/pyproject.toml
index f2a715a..fb51064 100644
--- a/file_tools/pyproject.toml
+++ b/file_tools/pyproject.toml
@@ -4,12 +4,14 @@ build-backend = "hatchling.build"
[project]
name = "lzwcai-mcpskills-file-tools-mcp"
-version = "0.1.9"
+version = "0.1.12"
description = "File tools MCP server"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.6.0",
- "httpx>=0.24.0"
+ "openpyxl>=3.1.2",
+ "httpx>=0.24.0",
+ "minio>=7.2.0"
]
[project.scripts]
@@ -17,4 +19,3 @@ lzwcai-mcpskills-file-tools-mcp = "file_tools.main:main"
[tool.hatch.build.targets.wheel]
packages = ["file_tools"]
-
diff --git a/lzwcai_lark_mcp/card.txt b/lzwcai_lark_mcp/card.txt
new file mode 100644
index 0000000..44d8395
--- /dev/null
+++ b/lzwcai_lark_mcp/card.txt
@@ -0,0 +1,237 @@
+{
+ "schema": "2.0",
+ "config": {
+ "update_multi": true,
+ "style": {
+ "text_size": {
+ "normal_v2": {
+ "default": "normal",
+ "pc": "normal",
+ "mobile": "heading"
+ }
+ }
+ }
+ },
+ "body": {
+ "direction": "vertical",
+ "elements": [
+ {
+ "tag": "column_set",
+ "flex_mode": "stretch",
+ "horizontal_spacing": "12px",
+ "horizontal_align": "left",
+ "columns": [
+ {
+ "tag": "column",
+ "width": "weighted",
+ "elements": [
+ {
+ "tag": "markdown",
+ "content": "**用户:** ",
+ "text_align": "left",
+ "text_size": "normal"
+ }
+ ],
+ "vertical_spacing": "8px",
+ "horizontal_align": "left",
+ "vertical_align": "top",
+ "weight": 1
+ },
+ {
+ "tag": "column",
+ "width": "weighted",
+ "elements": [
+ {
+ "tag": "markdown",
+ "content": "**发生时间:** ${change_time}",
+ "text_align": "left",
+ "text_size": "normal"
+ }
+ ],
+ "vertical_spacing": "8px",
+ "horizontal_align": "left",
+ "vertical_align": "top",
+ "weight": 1
+ }
+ ],
+ "margin": "0px 0px 0px 0px"
+ },
+ {
+ "tag": "hr",
+ "margin": "0px 0px 0px 0px"
+ },
+ {
+ "tag": "form",
+ "elements": [
+ {
+ "tag": "markdown",
+ "content": "**\\*请准确选择属于您的变动项:**",
+ "text_align": "left",
+ "text_size": "normal",
+ "margin": "0px 0px 8px 0px"
+ },
+ {
+ "tag": "multi_select_static",
+ "placeholder": {
+ "tag": "plain_text",
+ "content": "请选择资产变动项"
+ },
+ "options": [
+ {
+ "text": {
+ "tag": "plain_text",
+ "content": "资产项1"
+ },
+ "value": "item1"
+ },
+ {
+ "text": {
+ "tag": "plain_text",
+ "content": "资产项2"
+ },
+ "value": "item2"
+ }
+ ],
+ "type": "default",
+ "width": "fill",
+ "required": false,
+ "name": "asset_changes",
+ "margin": "0px 0px 16px 0px",
+ "element_id": "cIiptD7Z4hCtAeR5Rb0b"
+ },
+ {
+ "tag": "markdown",
+ "content": "**其他说明:**",
+ "text_align": "left",
+ "text_size": "normal_v2",
+ "margin": "0px 0px 0px 0px"
+ },
+ {
+ "tag": "input",
+ "placeholder": {
+ "tag": "plain_text",
+ "content": "请输入"
+ },
+ "default_value": "",
+ "width": "fill",
+ "name": "Input_3h27n7woqci",
+ "margin": "0px 0px 0px 0px"
+ },
+ {
+ "tag": "column_set",
+ "flex_mode": "flow",
+ "horizontal_spacing": "8px",
+ "horizontal_align": "right",
+ "columns": [
+ {
+ "tag": "column",
+ "width": "auto",
+ "elements": [
+ {
+ "tag": "button",
+ "text": {
+ "tag": "plain_text",
+ "content": "确认"
+ },
+ "type": "primary_filled",
+ "width": "default",
+ "size": "medium",
+ "behaviors": [
+ {
+ "type": "callback",
+ "value": {
+ "action": "confirm_asset_changes"
+ }
+ }
+ ],
+ "form_action_type": "submit",
+ "name": "confirm_button",
+ "margin": "4px 0px 4px 0px"
+ }
+ ],
+ "vertical_spacing": "8px",
+ "horizontal_align": "left",
+ "vertical_align": "top"
+ },
+ {
+ "tag": "column",
+ "width": "auto",
+ "elements": [
+ {
+ "tag": "button",
+ "text": {
+ "tag": "plain_text",
+ "content": "反馈问题"
+ },
+ "type": "default",
+ "width": "default",
+ "confirm": {
+ "title": {
+ "tag": "plain_text",
+ "content": "反馈并废除该确认单吗?"
+ },
+ "text": {
+ "tag": "plain_text",
+ "content": "${remark}"
+ }
+ },
+ "behaviors": [
+ {
+ "type": "callback",
+ "value": {
+ "": ""
+ }
+ }
+ ],
+ "form_action_type": "submit",
+ "name": "feedback_button",
+ "margin": "4px 0px 4px 0px"
+ }
+ ],
+ "vertical_spacing": "8px",
+ "horizontal_align": "left",
+ "vertical_align": "top"
+ }
+ ]
+ }
+ ],
+ "direction": "vertical",
+ "horizontal_align": "left",
+ "vertical_align": "top",
+ "padding": "12px 12px 12px 12px",
+ "margin": "0px 0px 0px 0px",
+ "name": "asset_confirmation_form"
+ },
+ {
+ "tag": "hr",
+ "margin": "0px 0px 0px 0px"
+ }
+ ]
+ },
+ "header": {
+ "title": {
+ "tag": "plain_text",
+ "content": "资产变动单"
+ },
+ "subtitle": {
+ "tag": "plain_text",
+ "content": ""
+ },
+ "text_tag_list": [
+ {
+ "tag": "text_tag",
+ "text": {
+ "tag": "plain_text",
+ "content": "待确认"
+ },
+ "color": "orange"
+ }
+ ],
+ "template": "blue",
+ "icon": {
+ "tag": "standard_icon",
+ "token": "googledrive_outlined"
+ },
+ "padding": "12px 8px 12px 8px"
+ }
+}
\ No newline at end of file
diff --git a/lzwcai_lark_mcp/lzwcai_lark_mcp/tools.py b/lzwcai_lark_mcp/lzwcai_lark_mcp/tools.py
index 60533bf..4592134 100644
--- a/lzwcai_lark_mcp/lzwcai_lark_mcp/tools.py
+++ b/lzwcai_lark_mcp/lzwcai_lark_mcp/tools.py
@@ -28,6 +28,28 @@ tools = [
"required": ["url"]
}
),
+ Tool(
+ name="upload_image_by_excel",
+ description="从Excel中根据image_key定位图片并上传返回image_key",
+ inputSchema={
+ "type": "object",
+ "properties": {
+ "excel_path": {
+ "type": "string",
+ "description": "Excel文件路径"
+ },
+ "image_key": {
+ "type": "string",
+ "description": "Excel中的image_key"
+ },
+ "image_type": {
+ "type": "string",
+ "description": "图片类型,例如 message"
+ }
+ },
+ "required": ["excel_path", "image_key"]
+ }
+ ),
Tool(
name="send_card_message",
description="发送消息卡片,入参为receiver_ids、person_id和image_key",
@@ -52,6 +74,38 @@ tools = [
},
"required": ["image_key", "person_id"]
}
+ ),
+ Tool(
+ name="send_asset_confirmation_card",
+ description="发送资产变动确认卡片,入参为user_id、time、inputs和outputs",
+ inputSchema={
+ "type": "object",
+ "properties": {
+ "user_id": {
+ "type": "string",
+ "description": "消息接收者ID,同时用于卡片内person组件"
+ },
+ "time": {
+ "type": "string",
+ "description": "资产变动发生时间"
+ },
+ "inputs": {
+ "type": "array",
+ "description": "入库资产列表",
+ "items": {
+ "type": "string"
+ }
+ },
+ "outputs": {
+ "type": "array",
+ "description": "出库资产列表",
+ "items": {
+ "type": "string"
+ }
+ }
+ },
+ "required": ["user_id", "time"]
+ }
)
]
@@ -94,6 +148,400 @@ def upload_image_by_url(token: str, url: str, image_type: str) -> str:
return image_key
+def upload_image_by_file(token: str, file_path: str, image_type: str) -> str:
+ if not os.path.isfile(file_path):
+ raise FileNotFoundError(f"image file not found: {file_path}")
+ filename = os.path.basename(file_path) or "image"
+ content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
+ with open(file_path, "rb") as handle:
+ content = handle.read()
+ headers = {"Authorization": f"Bearer {token}"}
+ data = {"image_type": image_type}
+ files = {"image": (filename, content, content_type)}
+ response = requests.post(
+ "https://open.feishu.cn/open-apis/im/v1/images",
+ headers=headers,
+ data=data,
+ files=files,
+ timeout=30
+ )
+ response.raise_for_status()
+ payload = response.json()
+ if payload.get("code") not in (0, None):
+ raise RuntimeError(f"lark image upload failed: {payload}")
+ image_key = payload.get("data", {}).get("image_key")
+ if not image_key:
+ raise RuntimeError(f"lark image upload missing image_key: {payload}")
+ return image_key
+
+
+def _is_url(value: str) -> bool:
+ parsed = urlparse(value)
+ return parsed.scheme in ("http", "https")
+
+
+def _normalize_header(value: object) -> str:
+ if value is None:
+ return ""
+ return str(value).strip().lower()
+
+
+def _resolve_image_source_from_excel(excel_path: str, image_key: str) -> str:
+ from openpyxl import load_workbook
+
+ workbook = load_workbook(excel_path, read_only=True, data_only=True)
+ worksheet = workbook.worksheets[0]
+ header_row = next(worksheet.iter_rows(min_row=1, max_row=1, values_only=True), None)
+ if not header_row:
+ raise ValueError("excel header row is empty")
+ header_map = {}
+ for index, header in enumerate(header_row):
+ name = _normalize_header(header)
+ if name:
+ header_map[name] = index
+ key_col_index = None
+ for key_name in ("image_key", "imagekey", "key"):
+ if key_name in header_map:
+ key_col_index = header_map[key_name]
+ break
+ if key_col_index is None:
+ raise ValueError("missing image_key column in excel")
+ candidate_columns = ["image_path", "image_url", "image", "url", "file_path", "path", "image_file"]
+ candidate_indices = [header_map[name] for name in candidate_columns if name in header_map]
+ matched_source = ""
+ for row in worksheet.iter_rows(min_row=2, values_only=True):
+ if key_col_index >= len(row):
+ continue
+ cell_value = row[key_col_index]
+ if cell_value is None:
+ continue
+ if str(cell_value).strip() != image_key:
+ continue
+ for index in candidate_indices:
+ if index < len(row):
+ candidate_value = row[index]
+ if candidate_value is not None and str(candidate_value).strip():
+ matched_source = str(candidate_value).strip()
+ break
+ if not matched_source:
+ matched_source = str(cell_value).strip()
+ break
+ if not matched_source:
+ raise ValueError(f"missing image source for image_key: {image_key}")
+ if not _is_url(matched_source):
+ if not os.path.isabs(matched_source):
+ matched_source = os.path.join(os.path.dirname(excel_path), matched_source)
+ return matched_source
+
+
+def _normalize_asset_items(value: object) -> List[str]:
+ if value is None:
+ return []
+ if isinstance(value, str):
+ segments = [segment.strip() for segment in re.split(r"[,,、;;]+", value)]
+ return [segment for segment in segments if segment]
+ if isinstance(value, list):
+ items: List[str] = []
+ for item in value:
+ items.extend(_normalize_asset_items(item))
+ return items
+ if isinstance(value, dict):
+ if "inputs" in value:
+ return _normalize_asset_items(value.get("inputs"))
+ if "outputs" in value:
+ return _normalize_asset_items(value.get("outputs"))
+ if "name" in value:
+ return _normalize_asset_items(value.get("name"))
+ items: List[str] = []
+ for item in value.values():
+ items.extend(_normalize_asset_items(item))
+ return items
+ text = str(value).strip()
+ return [text] if text else []
+
+
+def _build_asset_options(inputs: List[str], outputs: List[str]) -> tuple[list[Dict[str, object]], str]:
+ labels = []
+ options = []
+ for item in outputs:
+ label = f"{item}(出)"
+ labels.append(label)
+ options.append({
+ "text": {"tag": "plain_text", "content": label},
+ "value": f"{item}|出"
+ })
+ for item in inputs:
+ label = f"{item}(入)"
+ labels.append(label)
+ options.append({
+ "text": {"tag": "plain_text", "content": label},
+ "value": f"{item}|入"
+ })
+ return options, "、".join(labels)
+
+
+def send_asset_confirmation_card(token: str, user_id: str, change_time: str, inputs: object, outputs: object) -> str:
+ normalized_user_id = str(user_id).strip()
+ if not normalized_user_id:
+ raise ValueError("missing user_id")
+ normalized_time = str(change_time or "").strip()
+ if not normalized_time:
+ normalized_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ input_items = _normalize_asset_items(inputs)
+ output_items = _normalize_asset_items(outputs)
+ options, summary = _build_asset_options(input_items, output_items)
+ content = {
+ "schema": "2.0",
+ "config": {
+ "update_multi": True,
+ "style": {
+ "text_size": {
+ "normal_v2": {
+ "default": "normal",
+ "pc": "normal",
+ "mobile": "heading"
+ }
+ }
+ }
+ },
+ "body": {
+ "direction": "vertical",
+ "elements": [
+ {
+ "tag": "column_set",
+ "flex_mode": "stretch",
+ "horizontal_spacing": "12px",
+ "horizontal_align": "left",
+ "columns": [
+ {
+ "tag": "column",
+ "width": "weighted",
+ "elements": [
+ {
+ "tag": "markdown",
+ "content": f"**用户:** ",
+ "text_align": "left",
+ "text_size": "normal"
+ }
+ ],
+ "vertical_spacing": "8px",
+ "horizontal_align": "left",
+ "vertical_align": "top",
+ "weight": 1
+ },
+ {
+ "tag": "column",
+ "width": "weighted",
+ "elements": [
+ {
+ "tag": "markdown",
+ "content": f"**发生时间:** {normalized_time}",
+ "text_align": "left",
+ "text_size": "normal"
+ }
+ ],
+ "vertical_spacing": "8px",
+ "horizontal_align": "left",
+ "vertical_align": "top",
+ "weight": 1
+ }
+ ],
+ "margin": "0px 0px 0px 0px"
+ },
+ {
+ "tag": "hr",
+ "margin": "0px 0px 0px 0px"
+ },
+ {
+ "tag": "form",
+ "elements": [
+ {
+ "tag": "markdown",
+ "content": "**\\*请准确选择属于您的变动项:**",
+ "text_align": "left",
+ "text_size": "normal",
+ "margin": "0px 0px 8px 0px"
+ },
+ {
+ "tag": "multi_select_static",
+ "placeholder": {
+ "tag": "plain_text",
+ "content": "请选择资产变动项"
+ },
+ "options": options,
+ "type": "default",
+ "width": "fill",
+ "required": False,
+ "name": "asset_changes",
+ "margin": "0px 0px 16px 0px",
+ "element_id": "cIiptD7Z4hCtAeR5Rb0b"
+ },
+ {
+ "tag": "markdown",
+ "content": "**其他说明:**",
+ "text_align": "left",
+ "text_size": "normal_v2",
+ "margin": "0px 0px 0px 0px"
+ },
+ {
+ "tag": "input",
+ "placeholder": {
+ "tag": "plain_text",
+ "content": "请输入"
+ },
+ "default_value": "",
+ "width": "fill",
+ "name": "Input_3h27n7woqci",
+ "margin": "0px 0px 0px 0px"
+ },
+ {
+ "tag": "column_set",
+ "flex_mode": "flow",
+ "horizontal_spacing": "8px",
+ "horizontal_align": "right",
+ "columns": [
+ {
+ "tag": "column",
+ "width": "auto",
+ "elements": [
+ {
+ "tag": "button",
+ "text": {
+ "tag": "plain_text",
+ "content": "确认"
+ },
+ "type": "primary_filled",
+ "width": "default",
+ "size": "medium",
+ "behaviors": [
+ {
+ "type": "callback",
+ "value": {
+ "action": "confirm_asset_changes"
+ }
+ }
+ ],
+ "form_action_type": "submit",
+ "name": "confirm_button",
+ "margin": "4px 0px 4px 0px"
+ }
+ ],
+ "vertical_spacing": "8px",
+ "horizontal_align": "left",
+ "vertical_align": "top"
+ },
+ {
+ "tag": "column",
+ "width": "auto",
+ "elements": [
+ {
+ "tag": "button",
+ "text": {
+ "tag": "plain_text",
+ "content": "反馈问题"
+ },
+ "type": "default",
+ "width": "default",
+ "confirm": {
+ "title": {
+ "tag": "plain_text",
+ "content": "反馈并废除该确认单吗?"
+ },
+ "text": {
+ "tag": "plain_text",
+ "content": summary
+ }
+ },
+ "behaviors": [
+ {
+ "type": "callback",
+ "value": {
+ "": ""
+ }
+ }
+ ],
+ "form_action_type": "submit",
+ "name": "feedback_button",
+ "margin": "4px 0px 4px 0px"
+ }
+ ],
+ "vertical_spacing": "8px",
+ "horizontal_align": "left",
+ "vertical_align": "top"
+ }
+ ]
+ }
+ ],
+ "direction": "vertical",
+ "horizontal_align": "left",
+ "vertical_align": "top",
+ "padding": "12px 12px 12px 12px",
+ "margin": "0px 0px 0px 0px",
+ "name": "asset_confirmation_form"
+ },
+ {
+ "tag": "hr",
+ "margin": "0px 0px 0px 0px"
+ }
+ ]
+ },
+ "header": {
+ "title": {
+ "tag": "plain_text",
+ "content": "资产变动单"
+ },
+ "subtitle": {
+ "tag": "plain_text",
+ "content": ""
+ },
+ "text_tag_list": [
+ {
+ "tag": "text_tag",
+ "text": {
+ "tag": "plain_text",
+ "content": "待确认"
+ },
+ "color": "orange"
+ }
+ ],
+ "template": "blue",
+ "icon": {
+ "tag": "standard_icon",
+ "token": "googledrive_outlined"
+ },
+ "padding": "12px 8px 12px 8px"
+ }
+ }
+ payload = {
+ "receive_id": normalized_user_id,
+ "msg_type": "interactive",
+ "content": json.dumps(content, ensure_ascii=False)
+ }
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json"
+ }
+ response = requests.post(
+ "https://open.feishu.cn/open-apis/im/v1/messages",
+ params={"receive_id_type": "user_id"},
+ headers=headers,
+ json=payload,
+ timeout=30
+ )
+ try:
+ data = response.json()
+ except ValueError:
+ data = {"raw": response.text}
+ if not response.ok:
+ raise RuntimeError(f"lark send message http error: {data}")
+ if data.get("code") not in (0, None):
+ raise RuntimeError(f"lark send message failed: {data}")
+ message_id = data.get("data", {}).get("message_id")
+ if not message_id:
+ raise RuntimeError(f"lark send message missing message_id: {data}")
+ return message_id
+
+
def send_card_message(token: str, receiver_id: str, person_id: str, image_key: str) -> str:
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = {
@@ -261,6 +709,29 @@ async def handle_call_tool(name: str, arguments: Dict[str, object], token: str)
raise ValueError("missing url")
result = upload_image_by_url(token, url, image_type)
+ elif name == "upload_image_by_excel":
+ image_type = str(arguments.get("image_type") or "message").strip()
+ excel_path = str(arguments.get("excel_path", "")).strip()
+ image_key = str(arguments.get("image_key", "")).strip()
+ if not image_type:
+ raise ValueError("missing image_type")
+ if not excel_path:
+ raise ValueError("missing excel_path")
+ if not image_key:
+ raise ValueError("missing image_key")
+ if not os.path.exists(excel_path):
+ raise FileNotFoundError(f"excel_path not found: {excel_path}")
+ excel_extension = os.path.splitext(excel_path)[1].lower()
+ if excel_extension in (".xls",):
+ raise ValueError("xls is not supported, please convert to xlsx")
+ if excel_extension in (".xlsx", ".xlsm", ".xltx", ".xltm"):
+ image_source = _resolve_image_source_from_excel(excel_path, image_key)
+ else:
+ image_source = excel_path
+ if _is_url(image_source):
+ result = upload_image_by_url(token, image_source, image_type)
+ else:
+ result = upload_image_by_file(token, image_source, image_type)
elif name == "send_card_message":
image_key = str(arguments.get("image_key", "")).strip()
receiver_ids = arguments.get("receiver_ids")
@@ -273,6 +744,14 @@ async def handle_call_tool(name: str, arguments: Dict[str, object], token: str)
if not isinstance(receiver_ids, list):
raise ValueError("receiver_ids must be a list")
result = send_card_messages(token, receiver_ids, person_id, image_key)
+ elif name == "send_asset_confirmation_card":
+ user_id = str(arguments.get("user_id", "")).strip()
+ change_time = str(arguments.get("time", "")).strip()
+ inputs = arguments.get("inputs")
+ outputs = arguments.get("outputs")
+ if not user_id:
+ raise ValueError("missing user_id")
+ result = send_asset_confirmation_card(token, user_id, change_time, inputs, outputs)
else:
raise ValueError(f"unknown tool name: {name}")
return [TextContent(type="text", text=result)]
diff --git a/lzwcai_lark_mcp/pyproject.toml b/lzwcai_lark_mcp/pyproject.toml
index cae69c4..ae5e863 100644
--- a/lzwcai_lark_mcp/pyproject.toml
+++ b/lzwcai_lark_mcp/pyproject.toml
@@ -4,11 +4,12 @@ build-backend = "hatchling.build"
[project]
name = "lzwcai-mcpskills-lark-mcp"
-version = "0.1.4"
+version = "0.1.10"
description = "Lark MCP server"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.6.0",
+ "openpyxl>=3.1.2",
"python-dotenv>=0.21.0",
"requests>=2.25.0"
]
diff --git a/lzwcai_lark_mcp/test.py b/lzwcai_lark_mcp/test.py
index 09bedbe..70e473c 100644
--- a/lzwcai_lark_mcp/test.py
+++ b/lzwcai_lark_mcp/test.py
@@ -20,15 +20,19 @@ def main() -> None:
if not os.getenv("app_id") or not os.getenv("app_secret"):
raise RuntimeError("missing app_id or app_secret")
from lzwcai_lark_mcp.main import LarkMcpServer
- from lzwcai_lark_mcp.tools import send_card_message
+ from lzwcai_lark_mcp.tools import send_asset_confirmation_card
async def _run() -> None:
server = LarkMcpServer()
await server.ensure_token()
- image_key = "img_v3_02uq_d48b3ee1-0f89-44a3-80cd-a5afa4f8c39g"
- receiver_id = "843ga2gb"
- person_id = receiver_id
- result = send_card_message(server.tenant_access_token or "", receiver_id, person_id, image_key)
+ user_id = "843ga2gb"
+ result = send_asset_confirmation_card(
+ server.tenant_access_token or "",
+ user_id,
+ "2026-02-13 10:30:00",
+ ["华为i手机"],
+ ["红米手机"]
+ )
print(result)
asyncio.run(_run())