test: 更新测试脚本以独立测试飞书卡片发送功能

- 将 test.py 改为同步请求方式,移除对 mcp 模块的依赖
- 新增 test_send_card.py 用于测试卡片模板的加载与渲染
- 更新资产确认卡片的参数格式,支持更灵活的资产列表结构
- 改进环境变量读取逻辑,兼容新旧配置键名
This commit is contained in:
bin
2026-03-04 10:35:02 +08:00
parent fa226733b8
commit 10fbb58b70
2 changed files with 169 additions and 20 deletions

View File

@@ -1,7 +1,9 @@
import asyncio
import json import json
import os import os
import sys
import types
from pathlib import Path from pathlib import Path
import requests
def main() -> None: def main() -> None:
@@ -9,33 +11,61 @@ def main() -> None:
config_path = Path(__file__).with_name("mcp-server.json") config_path = Path(__file__).with_name("mcp-server.json")
if config_path.exists(): if config_path.exists():
config_data = json.loads(config_path.read_text(encoding="utf-8")) config_data = json.loads(config_path.read_text(encoding="utf-8"))
servers = config_data.get("mcpServers", {})
env_data = ( env_data = (
config_data.get("mcpServers", {}) servers.get("lzwcai-lark-mcp", {}).get("env", {})
.get("lzwcai-mcpskills-lark-mcp", {}) or servers.get("lzwcai-mcpskills-lark-mcp", {}).get("env", {})
.get("env", {})
) )
if env_data.get("app_id") and env_data.get("app_secret"): if env_data.get("app_id") and env_data.get("app_secret"):
os.environ["app_id"] = env_data["app_id"] os.environ["app_id"] = env_data["app_id"]
os.environ["app_secret"] = env_data["app_secret"] os.environ["app_secret"] = env_data["app_secret"]
if not os.getenv("app_id") or not os.getenv("app_secret"): if not os.getenv("app_id") or not os.getenv("app_secret"):
raise RuntimeError("missing app_id or app_secret") raise RuntimeError("missing app_id or app_secret")
from lzwcai_lark_mcp.main import LarkMcpServer if "mcp" not in sys.modules:
mcp_module = types.ModuleType("mcp")
types_module = types.ModuleType("mcp.types")
class Tool:
def __init__(self, *args, **kwargs):
pass
class TextContent:
def __init__(self, *args, **kwargs):
pass
types_module.Tool = Tool
types_module.TextContent = TextContent
mcp_module.types = types_module
sys.modules["mcp"] = mcp_module
sys.modules["mcp.types"] = types_module
from lzwcai_lark_mcp.tools import send_asset_confirmation_card from lzwcai_lark_mcp.tools import send_asset_confirmation_card
app_id = os.getenv("app_id", "")
async def _run() -> None: app_secret = os.getenv("app_secret", "")
server = LarkMcpServer() auth_url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
await server.ensure_token() response = requests.post(
user_id = "843ga2gb" auth_url,
result = send_asset_confirmation_card( json={"app_id": app_id, "app_secret": app_secret},
server.tenant_access_token or "", headers={"Content-Type": "application/json"},
user_id, timeout=10
"2026-02-13 10:30:00", )
["华为i手机"], response.raise_for_status()
["红米手机"] data = response.json()
) if data.get("code") not in (0, None):
print(result) raise RuntimeError(f"lark auth failed: {data}")
token = data.get("tenant_access_token", "")
asyncio.run(_run()) if not token:
raise RuntimeError(f"lark auth response missing token: {data}")
user_id = "gegg1d78"
result = send_asset_confirmation_card(
token,
user_id,
"CONF-20260301-001",
"2026-03-01 10:30:00",
[
{"华为手机": "huawei_phone"},
{"红米手机": "redmi_phone"},
"MacBook Pro"
],
"如有误报请点击反馈"
)
print(result)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,119 @@
import lark_oapi as lark
from lark_oapi.api.im.v1 import *
import json
import os
from datetime import datetime
# 配置你的 App ID 和 App Secret
APP_ID = 'cli_a8d0e0c140169013'
APP_SECRET = 'yEc0E8Aoo8Mo9NPPzphidez51xB71HXW'
# 你的 Open ID (请确保这个 ID 是正确的)
RECEIVE_ID = "ou_5c041720bc5a15235d6026ef118d77c9"
RECEIVE_ID_TYPE = "open_id"
# 卡片 JSON 文件路径
CARD_JSON_PATH = r"/home/lzwc/project/warehouse/origin_scripts/卡片源代码(供参考,禁止直接改动).json"
def load_and_render_card():
# 1. 读取 JSON 文件
with open(CARD_JSON_PATH, "r", encoding="utf-8") as f:
card_content = f.read()
# 2. 准备替换的数据
# 注意:简单的字符串替换无法处理 "${asset_list}" 这种需要替换为 JSON 数组的情况
# 所以我们需要先解析 JSON再遍历替换或者用更巧妙的方法
# 构造选项列表
asset_options = [
{"text": {"tag": "plain_text", "content": "显示器"}, "value": "monitor"},
{"text": {"tag": "plain_text", "content": "键盘"}, "value": "keyboard"},
{"text": {"tag": "plain_text", "content": "鼠标"}, "value": "mouse"}
]
# 这里我们采用一种混合策略:先替换简单的字符串变量,再解析 JSON 替换复杂对象
# 替换简单变量
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
card_content = card_content.replace("${order_number}", "ORD-TEST-001")
card_content = card_content.replace("${user_id}", RECEIVE_ID)
card_content = card_content.replace("${change_time}", current_time)
card_content = card_content.replace("${remark}", "如果不属实,请点击此按钮反馈")
# 解析为 Python 对象
card_json = json.loads(card_content)
# 3. 替换复杂对象 (options)
# 我们需要找到那个 multi_select_static 组件并替换它的 options
# 同时,我们需要将 order_number 注入到按钮的 value 中,以便回调时能获取到
order_number_val = "ORD-TEST-001"
try:
# 递归查找并替换 options="${asset_list}" 以及注入 order_number
def process_nodes(node):
if isinstance(node, dict):
# Check for options replacement
for key, value in node.items():
if key == "options" and value == "${asset_list}":
node[key] = asset_options
# Check for button behaviors
if node.get("tag") == "button":
behaviors = node.get("behaviors", [])
for behavior in behaviors:
if behavior.get("type") == "callback" and "value" in behavior:
# Inject order_number into the callback value
if isinstance(behavior["value"], dict):
behavior["value"]["order_number"] = order_number_val
# Recursively process children
for key, value in node.items():
process_nodes(value)
elif isinstance(node, list):
for item in node:
process_nodes(item)
process_nodes(card_json)
except Exception as e:
print(f"替换变量失败: {e}")
return None
return card_json
def main():
# 加载并渲染卡片
card_json = load_and_render_card()
if not card_json:
return
# 创建 Client
client = lark.Client.builder() \
.app_id(APP_ID) \
.app_secret(APP_SECRET) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 构造请求
request = CreateMessageRequest.builder() \
.receive_id_type(RECEIVE_ID_TYPE) \
.request_body(CreateMessageRequestBody.builder()
.receive_id(RECEIVE_ID)
.msg_type("interactive")
.content(json.dumps(card_json)) # 这里再次序列化为字符串
.build()) \
.build()
# 发送请求
response = client.im.v1.message.create(request)
# 处理响应
if not response.success():
print(f"发送失败: code: {response.code}, msg: {response.msg}, error: {response.error}")
return
print(f"发送成功! message_id: {response.data.message_id}")
if __name__ == "__main__":
main()