feat(lzwcai-demp-tool-server-dify-to-mcp): 初始化 Dify 集成工具模块

新增 Dify 到 MCP 的集成工具,支持通过 Dify API 将模型部署到 MCP 平台并进行推理。
该模块包含完整的服务器实现、依赖配置和命令行启动脚本。

主要功能:
- 支持 Workflow 和 Completion 模式的调用
- 自动翻译工具名称为驼峰命名格式
- 提供文件上传与任务停止接口
- 兼容流式与非流式响应处理
This commit is contained in:
2025-12-16 17:52:04 +08:00
commit ec7e7fd7dc
179 changed files with 18443 additions and 0 deletions

View File

@@ -0,0 +1,12 @@
Metadata-Version: 2.4
Name: lzwcai-demp-tool-server-dify-to-mcp
Version: 0.1.0
Summary: 这是一个Dify to MCP的集成工具通过Dify的API接口将Dify的模型部署到MCP平台并进行推理。
Requires-Python: >=3.10
Requires-Dist: httpx>=0.28.1
Requires-Dist: mcp>=1.1.2
Requires-Dist: omegaconf>=2.3.0
Requires-Dist: pip>=24.3.1
Requires-Dist: python-dotenv>=1.0.1
Requires-Dist: requests
Requires-Dist: pypinyin>=0.54.0

View File

@@ -0,0 +1,37 @@
# lzwcai-mcp-server-package
#### 介绍
lzwcai-mcp-server-package
#### 软件架构
软件架构说明
#### 安装教程
1. xxxx
2. xxxx
3. xxxx
#### 使用说明
1. xxxx
2. xxxx
3. xxxx
#### 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
#### 特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

View File

@@ -0,0 +1,12 @@
Metadata-Version: 2.4
Name: lzwcai-demp-tool-server-dify-to-mcp
Version: 0.1.4
Summary: 这是一个Dify to MCP的集成工具通过Dify的API接口将Dify的模型部署到MCP平台并进行推理。
Requires-Python: >=3.10
Requires-Dist: httpx>=0.28.1
Requires-Dist: mcp>=1.1.2
Requires-Dist: omegaconf>=2.3.0
Requires-Dist: pip>=24.3.1
Requires-Dist: python-dotenv>=1.0.1
Requires-Dist: requests
Requires-Dist: pypinyin>=0.54.0

View File

@@ -0,0 +1,23 @@
README.md
pyproject.toml
setup.cfg
lzwcai_demp_tool_server_dify_to_mcp.egg-info/PKG-INFO
lzwcai_demp_tool_server_dify_to_mcp.egg-info/SOURCES.txt
lzwcai_demp_tool_server_dify_to_mcp.egg-info/dependency_links.txt
lzwcai_demp_tool_server_dify_to_mcp.egg-info/entry_points.txt
lzwcai_demp_tool_server_dify_to_mcp.egg-info/requires.txt
lzwcai_demp_tool_server_dify_to_mcp.egg-info/top_level.txt
src/__init__.py
src/create_mcp.py
src/create_mcp_util.py
src/chat/__init__.py
src/chat/chat_server.py
src/completion/completion_server.py
src/completion/test.py
src/core/__init__.py
src/core/core_server.py
src/difyTaskCall/task_instance.py
src/utils/tool_translation.py
src/utils/translator.py
src/workflow/__init__.py
src/workflow/workflow_server.py

View File

@@ -0,0 +1,2 @@
[console_scripts]
lzwcai-demp-tool-server-dify-to-mcp = src.create_mcp:run_main

View File

@@ -0,0 +1,7 @@
httpx>=0.28.1
mcp>=1.1.2
omegaconf>=2.3.0
pip>=24.3.1
python-dotenv>=1.0.1
requests
pypinyin>=0.54.0

View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""
主入口文件
用于启动 Dify MCP 服务器,并配置命令行参数
"""
import os
import sys
# Mock 配置参数
def setup_mock_arguments():
"""
设置模拟命令行参数
这些参数可以根据实际需求进行修改
"""
# 默认配置
default_config = {
"base_url": "http://192.168.2.236:3001/v1",
"app_sks": ["app-YFHByB4whARWVqXN2LcuPudq"],
"mode_type": "workflow",
"transport": "stdio"
}
# 如果没有提供命令行参数,则添加默认参数
if len(sys.argv) == 1:
sys.argv.extend([
"--base-url", default_config["base_url"],
"--app-sks", *default_config["app_sks"],
"--mode-type", default_config["mode_type"]
])
return default_config
def main():
"""
主函数:设置命令行参数并启动服务器
"""
# 设置模拟命令行参数
config = setup_mock_arguments()
# 导入并运行 MCP 服务器
try:
from src.create_mcp import run_main
# 获取传输模式
transport_mode = config.get("transport", "stdio")
# 运行服务器(不输出额外信息,避免干扰 STDIO 通信)
run_main(transport=transport_mode)
except ImportError as e:
print(f"[ERROR] 导入错误: {e}", file=sys.stderr)
print("请确保已正确安装所有依赖包", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"[ERROR] 运行错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,32 @@
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "lzwcai-demp-tool-server-dify-to-mcp"
version = "0.1.4"
description = "这是一个Dify to MCP的集成工具通过Dify的API接口将Dify的模型部署到MCP平台并进行推理。"
requires-python = ">=3.10"
dependencies = [
"httpx>=0.28.1",
"mcp>=1.1.2",
"omegaconf>=2.3.0",
"pip>=24.3.1",
"python-dotenv>=1.0.1",
"requests",
"pypinyin>=0.54.0",
]
[tool.setuptools]
packages = {find = {where = ["."], include = ["src*"]}}
include-package-data = true
[project.scripts]
lzwcai-demp-tool-server-dify-to-mcp = "src.create_mcp:run_main"
[tool.hatch.build.targets.wheel]
packages = ["src"]
[tool.setuptools.package-data]
"*" = ["*.env"]
"src" = ["**/*.env"]

View File

@@ -0,0 +1,4 @@
[egg_info]
tag_build =
tag_date = 0

View File

@@ -0,0 +1,7 @@
class ChatDifyAPI:
def __init__(self, base_url: str, app_sks: str):
self.base_url = base_url
self.app_sks = app_sks
def process_task(self, task_id: str, **kwargs):
pass

View File

@@ -0,0 +1,212 @@
import requests
from abc import ABC
import logging
import json
import re
import pypinyin
logger = logging.getLogger(__name__)
def pinyin_to_camel(pinyin):
"""
将拼音列表转换为驼峰命名
例如: ['ni', 'hao', 'a'] -> 'NiHaoA'
所有非字母数字字符会被替换为下划线
"""
# 使用正则表达式将所有非字母数字字符(包括所有标点符号)替换为下划线
cleaned = re.sub(r'[^\w\s]', '_', pinyin)
# 将空格也替换为下划线
cleaned = re.sub(r'\s+', '_', cleaned)
# 移除连续的下划线并去除首尾下划线
cleaned = re.sub(r'_+', '_', cleaned).strip('_')
# 转换为拼音并生成驼峰命名
pinyin_list = pypinyin.lazy_pinyin(cleaned)
return "tool_" + "".join(word.capitalize() for word in pinyin_list)
class CompletionDifyAPI(ABC):
def __init__(self, base_url: str, dify_app_sks: list, user="pp666"):
# dify configs
self.dify_base_url = base_url
self.dify_app_sks = dify_app_sks
self.user = user
# dify app infos
dify_app_infos = []
dify_app_params = []
dify_app_metas = []
for key in self.dify_app_sks:
dify_app_infos.append(self.get_app_info(key))
dify_app_params.append(self.get_app_parameters(key))
dify_app_metas.append(self.get_app_meta(key))
self.dify_app_infos = dify_app_infos
self.dify_app_params = dify_app_params
self.dify_app_metas = dify_app_metas
self.dify_app_names = [x["name"] for x in dify_app_infos]
def chat_message(
self,
api_key,
inputs={},
response_mode="streaming",
conversation_id=None,
userId="pp666",
files=None,
):
url = f"{self.dify_base_url}/completion-messages"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = {
"inputs": inputs,
"response_mode": response_mode,
"user": userId,
}
if conversation_id:
data["conversation_id"] = conversation_id
if response_mode == "streaming":
response = requests.post(url, headers=headers, json=data, stream=True)
# 处理流式响应
full_answer = ""
for line in response.iter_lines():
if line:
# 跳过 "data:" 前缀
decoded_line = line.decode("utf-8")
if decoded_line.startswith("data:"):
try:
json_str = decoded_line[5:].strip()
data = json.loads(json_str)
if data.get("event") == "message" and "answer" in data:
# 累积完整答案
full_answer += data["answer"]
# 这里也可以选择处理每个部分响应,例如返回生成器
# yield data
except json.JSONDecodeError:
logger.warning(f"无法解析JSON数据: {decoded_line}")
# 创建一个符合非流式响应格式的结果
response_data = {"answer": full_answer}
# 处理可能包含代码块的数据
processed_data = self.process_answer_code_block(response_data)
return processed_data
else:
response = requests.post(url, headers=headers, json=data)
response_data = response.json()
# 处理可能包含代码块的数据
processed_data = self.process_answer_code_block(response_data)
return processed_data
def upload_file(self, api_key, file_path, user="pp666"):
url = f"{self.dify_base_url}/files/upload"
headers = {"Authorization": f"Bearer {api_key}"}
files = {"file": open(file_path, "rb")}
data = {"user": user}
response = requests.post(url, headers=headers, files=files, data=data)
response.raise_for_status()
return response.json()
def stop_response(self, api_key, task_id, user="pp666"):
url = f"{self.dify_base_url}/chat-messages/{task_id}/stop"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = {"user": user}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
def get_app_info(self, api_key, user="pp666"):
url = f"{self.dify_base_url}/info"
headers = {"Authorization": f"Bearer {api_key}"}
# params = {"user": user}
response = requests.get(url, headers=headers)
response.raise_for_status()
response_map = response.json()
# 翻译工具名称
from src.utils.tool_translation import TranslationService
tool_name = response_map.get("name")
if tool_name:
# translated_name = TranslationService.translate_tool_name(tool_name)
translated_name = pinyin_to_camel(tool_name)
response_map["name"] = translated_name
# 翻译工具描述
# tool_description = response_map.get("description")
# if tool_description:
# translated_description = TranslationService.translate_tool_description(
# tool_description
# )
# response_map["description"] = (
# f"{tool_description} ({translated_description})"
# )
return response_map
def get_app_parameters(self, api_key, user="pp666"):
return {
"user_input_form": [
{"string": {"variable": "query", "label": "查询内容", "required": True}}
]
}
def get_app_meta(self, api_key, user="pp666"):
url = f"{self.dify_base_url}/meta"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"user": user}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
@staticmethod
def process_answer_code_block(data):
try:
# 获取answer字段
answer = data.get("answer", "")
# 构造符合workflow_finished格式的输出
formatted_response = [
{"event": "workflow_finished", "data": {"outputs": {"result": answer}}}
]
# 尝试处理可能的代码块
if answer.startswith("```") and answer.endswith("```"):
try:
# 移除代码块标记并解析JSON
code_content = answer.strip("```").strip()
json_data = json.loads(code_content)
# 如果包含description字段用它替换answer
if "description" in json_data:
formatted_response[0]["data"]["outputs"]["result"] = json_data[
"description"
]
except json.JSONDecodeError:
# 如果不是有效的JSON保留原始代码块内容
pass
return formatted_response
except Exception as e:
logger.warning(f"处理答案代码块时出错: {str(e)}")
# 发生错误时返回符合格式的基础响应
return [
{
"event": "workflow_finished",
"data": {
"outputs": {
"error": str(e),
"fallback": data.get("answer", str(data)),
}
},
}
]

View File

@@ -0,0 +1,104 @@
import requests
from abc import ABC
import logging
import json
logger = logging.getLogger(__name__)
res = {
"event": "message",
"task_id": "49c9ea1b-7b43-475b-a680-d769fb238a45",
"id": "432ab98e-5e36-4a29-abe5-e01281c3678c",
"message_id": "432ab98e-5e36-4a29-abe5-e01281c3678c",
"mode": "completion",
"answer": '```\n{\n "description": "该API的具体功能描述暂时不明确因为提供的API信息 \'今天打老虎啊按时啊啊\' 并不是有效的API名称或描述。请提供正确的API名称和相关输入输出信息以便我能为其补充完善的API描述。"\n}\n```',
"metadata": {
"usage": {
"prompt_tokens": 73,
"prompt_unit_price": "0.0",
"prompt_price_unit": "0.0",
"prompt_price": "0.0",
"completion_tokens": 61,
"completion_unit_price": "0.0",
"completion_price_unit": "0.0",
"completion_price": "0.0",
"total_tokens": 134,
"total_price": "0.0",
"currency": "USD",
"latency": 1.896302318200469,
}
},
"created_at": 1747233054,
}
def process_answer_code_block(data):
try:
# 获取answer字段
answer = data.get("answer", "")
# 检查answer是否是代码块格式
if answer.startswith("```") and answer.endswith("```"):
# 移除代码块标记并解析JSON
code_content = answer.strip("```").strip()
json_data = json.loads(code_content)
# 获取description字段
if "description" in json_data:
return json_data["description"]
# 如果不是预期格式则返回原始answer
return data.get("answer", data)
except Exception as e:
logger.warning(f"处理答案代码块时出错: {str(e)}")
# 发生错误时返回原始数据
return data.get("answer", data)
def chat_message_test(
api_key,
inputs={},
response_mode="blocking",
conversation_id=None,
userId="pp666",
files=None,
):
url = "https://ops.lzwcai.com/v1/completion-messages"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = {
"inputs": inputs,
"response_mode": response_mode,
"user": userId,
}
if conversation_id:
data["conversation_id"] = conversation_id
if response_mode == "streaming":
response = requests.post(
url, headers=headers, json=data, stream=response_mode == "streaming"
)
return response
else:
response = requests.post(url, headers=headers, json=data)
return response.json()
if __name__ == "__main__":
print("开始执行主程序")
try:
print("准备调用chat_message方法")
res = chat_message_test(
api_key="app-Ppemii3c0ROPoLvRwskgZ7Il",
inputs={"query": "今天打老虎啊按时啊啊"},
response_mode="streaming",
userId="abc-123",
)
print("chat_message方法调用完成")
# 打印响应内容
print("响应内容:", res)
# print(process_answer_code_block(res))
except Exception as e:
print(f"执行过程中出现错误: {e}")

View File

@@ -0,0 +1,172 @@
import requests
from abc import ABC
import logging
import json
import re
import pypinyin
logger = logging.getLogger(__name__)
def pinyin_to_camel(pinyin):
"""
将拼音列表转换为驼峰命名
例如: ['ni', 'hao', 'a'] -> 'NiHaoA'
所有非字母数字字符会被替换为下划线
"""
# 使用正则表达式将所有非字母数字字符(包括所有标点符号)替换为下划线
cleaned = re.sub(r'[^\w\s]', '_', pinyin)
# 将空格也替换为下划线
cleaned = re.sub(r'\s+', '_', cleaned)
# 移除连续的下划线并去除首尾下划线
cleaned = re.sub(r'_+', '_', cleaned).strip('_')
# 转换为拼音并生成驼峰命名
pinyin_list = pypinyin.lazy_pinyin(cleaned)
return "tool_" + "".join(word.capitalize() for word in pinyin_list)
class DifyAPI(ABC):
def __init__(self, base_url: str, dify_app_sks: list, user="pp666"):
# dify configs
self.dify_base_url = base_url
self.dify_app_sks = dify_app_sks
self.user = user
# dify app infos
dify_app_infos = []
dify_app_params = []
dify_app_metas = []
for key in self.dify_app_sks:
dify_app_infos.append(self.get_app_info(key))
dify_app_params.append(self.get_app_parameters(key))
dify_app_metas.append(self.get_app_meta(key))
print("dify_app_params", dify_app_params)
self.dify_app_infos = dify_app_infos
self.dify_app_params = dify_app_params
self.dify_app_metas = dify_app_metas
self.dify_app_names = [x["name"] for x in dify_app_infos]
def chat_message(
self,
api_key,
inputs={},
response_mode="streaming",
conversation_id=None,
user="pp666",
files=None,
):
url = f"{self.dify_base_url}/workflows/run"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = {
"inputs": inputs,
"response_mode": response_mode,
"user": user,
}
logger.info("Sending data to Dify API: %s", data)
logger.info("Sending headers to Dify API: %s", headers)
logger.info("Sending url to Dify API: %s", url)
if conversation_id:
data["conversation_id"] = conversation_id
if files:
files_data = []
for file_info in files:
file_path = file_info.get("path")
transfer_method = file_info.get("transfer_method")
if transfer_method == "local_file":
files_data.append(("file", open(file_path, "rb")))
elif transfer_method == "remote_url":
pass
response = requests.post(
url,
headers=headers,
data=data,
files=files_data,
stream=response_mode == "streaming",
)
else:
response = requests.post(
url, headers=headers, json=data, stream=response_mode == "streaming"
)
response.raise_for_status()
if response_mode == "streaming":
for line in response.iter_lines():
if line:
if line.startswith(b"data:"):
try:
json_data = json.loads(line[5:].decode("utf-8"))
yield json_data
except json.JSONDecodeError:
print(f"Error decoding JSON: {line}")
else:
return response.json()
def upload_file(self, api_key, file_path, user="pp666"):
url = f"{self.dify_base_url}/files/upload"
headers = {"Authorization": f"Bearer {api_key}"}
files = {"file": open(file_path, "rb")}
data = {"user": user}
response = requests.post(url, headers=headers, files=files, data=data)
response.raise_for_status()
return response.json()
def stop_response(self, api_key, task_id, user="pp666"):
url = f"{self.dify_base_url}/chat-messages/{task_id}/stop"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = {"user": user}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
def get_app_info(self, api_key, user="pp666"):
url = f"{self.dify_base_url}/info"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"user": user}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
from src.utils.tool_translation import TranslationService
response_map = response.json()
# 翻译工具名称
# tool_name = response_map.get("name")
# translated_name = TranslationService.translate_tool_name(tool_name)
# response_map["name"] = translated_name
# # 翻译工具描述
# tool_description = response_map.get("description")
# translated_description = TranslationService.translate_tool_description(
# tool_description
# )
# response_map["description"] = translated_description
tool_name = response_map.get("name")
if tool_name:
# translated_name = TranslationService.translate_tool_name(tool_name)
translated_name = pinyin_to_camel(tool_name)
response_map["name"] = translated_name
return response_map
def get_app_parameters(self, api_key, user="pp666"):
url = f"{self.dify_base_url}/parameters"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"user": user}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
def get_app_meta(self, api_key, user="pp666"):
url = f"{self.dify_base_url}/meta"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"user": user}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()

View File

@@ -0,0 +1,318 @@
import asyncio
import json
import os
import logging
import argparse
from abc import ABC
import mcp.server.stdio
import mcp.types as types
import requests
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from omegaconf import OmegaConf
# from src.workflow.workflow_server import WorkflowDifyAPI
from src.difyTaskCall.task_instance import TaskInstance
# 配置日志记录
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def parse_arguments():
parser = argparse.ArgumentParser(description="Dify MCP服务器配置")
parser.add_argument(
"--base-url",
type=str,
help="API基础URL",
default="http://192.168.11.24:3001/v1",
)
parser.add_argument(
"--app-sks",
nargs="+",
help="应用秘钥列表",
default=["app-d7s00CJ2NY4LJzUEiZsVDnPN"],
)
parser.add_argument(
"--mode-type",
type=str,
help="Dify应用模式类型 (workflow, chat, completion)",
default="workflow",
choices=["workflow", "chat", "completion"],
)
return parser.parse_args()
def get_app_info(base_url=None, app_sks=None, mode_type=None):
# 获取命令行参数
args = parse_arguments()
# 命令行参数优先,其次是函数参数,最后是默认值
if args.base_url is not None:
base_url = args.base_url
if base_url is None:
base_url = "http://192.168.11.24:3001/v1"
if args.app_sks is not None:
app_sks = args.app_sks
if app_sks is None:
app_sks = ["app-d7s00CJ2NY4LJzUEiZsVDnPN"]
# 确保 app_sks 始终是列表类型
if isinstance(app_sks, str):
# 如果是字符串,转换为列表
app_sks = [app_sks]
if args.mode_type is not None:
mode_type = args.mode_type
if mode_type is None:
mode_type = "workflow"
return base_url, app_sks, mode_type
# return "https://dempdify.lzwcai.com/v1", ["app-X6wAy5nkvWB3hR69cgvIjC3r"], "workflow"
# 初始化服务器和Dify API
base_url, dify_app_sks, dify_app_mode_type = get_app_info()
server = Server("dify_mcp_server")
task_instance = TaskInstance(base_url, dify_app_sks, dify_app_mode_type)
dify_api = task_instance.get_task_instance(dify_app_mode_type)
def process_user_input_form(user_input_form):
"""
处理Dify应用的用户输入表单转换为JSON Schema格式
参数:
user_input_form: Dify应用的用户输入表单配置
返回:
处理后的inputSchema字典
"""
inputSchema = dict(
type="object",
properties={},
required=[],
)
property_num = len(user_input_form)
if property_num > 0:
for j in range(property_num):
param = user_input_form[j]
param_type = list(param.keys())[0]
param_info = param[param_type]
property_name = param_info["variable"]
# 根据不同控件类型处理
if param_type == "text-input":
inputSchema["properties"][property_name] = {
"type": "string",
"description": param_info["label"],
}
if "default" in param_info:
inputSchema["properties"][property_name]["default"] = param_info[
"default"
]
elif param_type == "paragraph":
inputSchema["properties"][property_name] = {
"type": "string",
"description": param_info["label"],
"format": "paragraph",
}
if "default" in param_info:
inputSchema["properties"][property_name]["default"] = param_info[
"default"
]
elif param_type == "select":
inputSchema["properties"][property_name] = {
"type": "string",
"description": param_info["label"],
"enum": param_info["options"],
}
if "default" in param_info:
inputSchema["properties"][property_name]["default"] = param_info[
"default"
]
elif param_type == "file_upload":
# 文件上传控件处理
file_type_schema = {
"type": "object",
"description": param_info["label"],
"properties": {
"file_url": {"type": "string", "description": "文件URL"},
"file_name": {"type": "string", "description": "文件名称"},
},
"required": ["file_url"],
}
# 处理图片上传配置
if "image" in param_info and param_info["image"]["enabled"]:
image_config = param_info["image"]
file_type_schema["properties"]["type"] = {
"type": "string",
"description": "文件类型支持png、jpg、jpeg、webp、gif",
"enum": ["png", "jpg", "jpeg", "webp", "gif"],
}
# 处理数量限制
number_limits = image_config.get("number_limits", 3)
if number_limits > 1:
# 如果允许多个文件,则使用数组
inputSchema["properties"][property_name] = {
"type": "array",
"description": param_info["label"],
"items": file_type_schema,
"maxItems": number_limits,
}
else:
# 如果只允许单个文件
inputSchema["properties"][property_name] = file_type_schema
else:
# 如果没有特定的图片配置,使用一般文件配置
inputSchema["properties"][property_name] = file_type_schema
else:
# 默认处理为字符串类型
inputSchema["properties"][property_name] = {
"type": "string",
"description": param_info["label"],
}
# 处理必填字段
if param_info.get("required", False):
inputSchema["required"].append(property_name)
# 添加必填的userId参数支持数字或字符串类型
# inputSchema["properties"]["userId"] = dict(
# oneOf=[{"type": "number"}, {"type": "string"}],
# description="您的员工ID用于识别您的员工身份",
# )
# inputSchema["required"].append("userId")
return inputSchema
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
列出可用的工具
返回:
工具列表每个工具都使用JSON Schema验证其参数
"""
tools = []
tool_names = dify_api.dify_app_names
tool_infos = dify_api.dify_app_infos
tool_params = dify_api.dify_app_params
tool_num = len(tool_names)
for i in range(tool_num):
# 加载每个工具的应用信息
app_info = tool_infos[i]
# 加载每个工具的应用参数
app_param = tool_params[i]
# 处理用户输入表单
inputSchema = process_user_input_form(app_param["user_input_form"])
tools.append(
types.Tool(
name=app_info["name"],
description=app_info["description"],
inputSchema=inputSchema,
)
)
return tools
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
调用工具处理请求
参数:
name: 工具名称
arguments: 工具参数
返回:
处理结果列表
"""
tool_names = dify_api.dify_app_names
if name in tool_names:
tool_idx = tool_names.index(name)
tool_sk = dify_api.dify_app_sks[tool_idx]
responses = dify_api.chat_message(
tool_sk,
inputs=arguments,
userId=arguments.get("userId", "pp666"),
)
for res in responses:
if res["event"] == "workflow_finished":
outputs = res["data"]["outputs"]
mcp_out = []
for _, v in outputs.items():
mcp_out.append(types.TextContent(type="text", text=v))
return mcp_out
else:
raise ValueError(f"Unknown tool: {name}")
def run_main(transport="stdio"):
"""
主函数使用stdin/stdout流运行服务器
"""
if transport == "stdio":
import anyio
from mcp.server.stdio import stdio_server
async def arun():
async with stdio_server() as streams:
await server.run(
streams[0],
streams[1],
InitializationOptions(
server_name="dify_mcp_server",
server_version="0.0.6",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
anyio.run(arun)
else:
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.responses import Response
from starlette.routing import Mount, Route
sse = SseServerTransport("/messages/")
async def handle_sse(request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(
streams[0], streams[1], server.create_initialization_options()
)
return Response()
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse, methods=["GET"]),
Mount("/messages/", app=sse.handle_post_message),
],
)
import uvicorn
uvicorn.run(starlette_app, host="0.0.0.0", port=8080, log_level="info")
if __name__ == "__main__":
run_main()

View File

@@ -0,0 +1,373 @@
import json
from typing import Dict, List, Any, Tuple, Optional
import logging
from pathlib import Path
# 常量定义
DEFAULT_NUMBER_LIMITS = 3 # 默认文件数量限制
def process_user_input_form(
user_input_form: List[Dict[str, Any]],
) -> Tuple[Dict[str, Any], List[str]]:
"""
处理Dify用户输入表单生成对应的JSON Schema properties和required列表
参数:
user_input_form: Dify应用的用户输入表单配置
返回:
properties: 表单字段的properties字典
required: 必填字段列表
"""
properties = {}
required = []
if not user_input_form:
return properties, required
for param in user_input_form:
try:
# 直接获取字典的第一个键,而不是通过list转换
param_type = next(iter(param))
param_info = param[param_type]
property_name = param_info["variable"]
properties[property_name] = {
"type": param_type,
"description": param_info["label"],
}
if param_info.get("required", False):
required.append(property_name)
except (KeyError, StopIteration) as e:
logging.warning(f"处理用户输入表单项时出错: {e}, 跳过此项")
continue
return properties, required
def process_file_upload(file_upload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""
处理Dify文件上传配置生成对应的JSON Schema properties
设计为通用实现,支持任意文件类型配置
参数:
file_upload: Dify应用的文件上传配置
返回:
file_properties: 文件上传的properties字典
"""
# 检查是否存在文件上传配置
if not file_upload:
return {}
# 收集所有启用的文件类型信息
enabled_types = []
max_items = 0
supported_transfer_methods = set()
file_type_configs = {}
for file_type, config in file_upload.items():
if not config.get("enabled", False):
continue
enabled_types.append(file_type)
number_limits = config.get("number_limits", DEFAULT_NUMBER_LIMITS)
max_items += number_limits
type_transfer_methods = config.get("transfer_methods", [])
supported_transfer_methods.update(type_transfer_methods)
# 存储每种文件类型的详细配置
file_type_configs[file_type] = {
"number_limits": number_limits,
"transfer_methods": type_transfer_methods,
}
# 如果没有启用的文件类型,返回空字典
if not enabled_types:
return {}
# 构建文件项的JSON Schema
file_item_schema = _build_file_item_schema(
enabled_types, supported_transfer_methods, file_type_configs
)
# 构建最终的files属性
file_properties = {
"files": {
"type": "array",
"items": file_item_schema,
"description": "支持多种文件类型的文件列表",
"maxItems": max_items,
}
}
return file_properties
def _build_file_item_schema(
enabled_types: List[str],
supported_transfer_methods: set,
file_type_configs: Dict[str, Dict[str, Any]],
) -> Dict[str, Any]:
"""
构建文件项的JSON Schema
参数:
enabled_types: 启用的文件类型列表
supported_transfer_methods: 支持的传输方式集合
file_type_configs: 每种文件类型的配置信息
返回:
file_item_schema: 文件项的JSON Schema
"""
file_item_schema = {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": enabled_types,
"description": "文件类型",
},
"transfer_method": {
"type": "string",
"enum": list(supported_transfer_methods),
"description": "传输方式",
},
},
}
# 添加条件属性
if "remote_url" in supported_transfer_methods:
file_item_schema["properties"]["url"] = {
"type": "string",
"format": "uri",
"description": "文件URL (当传输方式为remote_url时使用)",
}
if "local_file" in supported_transfer_methods:
file_item_schema["properties"]["upload_file_id"] = {
"type": "string",
"description": "上传文件ID (当传输方式为local_file时使用)",
}
# 添加条件验证逻辑
file_item_schema["allOf"] = []
# 为每种文件类型添加验证规则
for file_type, type_config in file_type_configs.items():
type_methods = type_config["transfer_methods"]
# 基本验证
file_item_schema["allOf"].append(
{
"if": {"properties": {"type": {"const": file_type}}},
"then": {
"properties": {
"transfer_method": {
"enum": type_methods,
"description": f"{file_type}类型支持的传输方式: {', '.join(type_methods)}",
}
}
},
}
)
# 为每种传输方法添加类型特定的验证
_add_transfer_method_validations(file_item_schema, file_type, type_methods)
return file_item_schema
def _add_transfer_method_validations(
file_item_schema: Dict[str, Any], file_type: str, type_methods: List[str]
) -> None:
"""
为每种传输方法添加类型特定的验证
参数:
file_item_schema: 文件项JSON Schema
file_type: 文件类型
type_methods: 该类型支持的传输方法列表
"""
for method in type_methods:
if method == "remote_url":
file_item_schema["allOf"].append(
{
"if": {
"properties": {
"type": {"const": file_type},
"transfer_method": {"const": "remote_url"},
},
"required": ["type", "transfer_method"],
},
"then": {"required": ["url"]},
}
)
elif method == "local_file":
file_item_schema["allOf"].append(
{
"if": {
"properties": {
"type": {"const": file_type},
"transfer_method": {"const": "local_file"},
},
"required": ["type", "transfer_method"],
},
"then": {"required": ["upload_file_id"]},
}
)
def convert_dify_params_to_schema(tool_params: Dict[str, Any]) -> Dict[str, Any]:
"""
将用户输入表单和文件上传配置组合成新的结构
参数:
tool_params: 包含user_input_form和file_upload的参数字典
返回:
组合后的结构
"""
# 参数验证
if not isinstance(tool_params, dict):
raise TypeError("tool_params 必须是字典类型")
# 处理用户输入表单
properties, required = process_user_input_form(
tool_params.get("user_input_form", [])
)
# 处理文件上传配置
file_properties = process_file_upload(tool_params.get("file_upload"))
# 创建新的结构
result = {
"inputs": {"type": "object", "properties": properties, "required": required},
# "userId": {
# "oneOf": [{"type": "number"}, {"type": "string"}],
# "description": "您的员工ID用于识别您的员工身份",
# "required": True,
# },
}
# 如果有文件上传配置添加files字段
if file_properties:
result["files"] = file_properties.get("files", {})
return result
def finalize_schema_structure(mock_result: Dict[str, Any]) -> Dict[str, Any]:
"""
将mock_result构建为符合要求的map_mock_result格式
参数:
mock_result: 通过convert_dify_params_to_schema函数获取的结果
返回:
构建后的map_mock_result字典
"""
# 确定required字段
# required_fields = ["userId"]
required_fields = []
# 只有当inputs的required有值时才添加inputs到顶层required
if (
mock_result.get("inputs", {}).get("required")
and len(mock_result["inputs"]["required"]) > 0
):
required_fields.append("inputs")
# 如果有文件上传也可以考虑添加files到required
if "files" in mock_result:
# 可以根据需求决定是否将files添加到required
# required_fields.append("files")
pass
return {
"type": "object",
"properties": mock_result,
"required": required_fields,
}
def create_json_file(data: Dict[str, Any], filename: str = "output.json") -> None:
"""
将数据保存为JSON文件
参数:
data: 要保存的数据
filename: 保存的文件名
"""
try:
# 获取mock数据
mock_result = convert_dify_params_to_schema(data)
# 使用封装的方法构建map_mock_result
map_mock_result = finalize_schema_structure(mock_result)
# 确保目标目录存在
output_path = Path(filename)
output_path.parent.mkdir(parents=True, exist_ok=True)
# 将结果写入JSON文件
with open(filename, "w", encoding="utf-8") as f:
json.dump(map_mock_result, f, ensure_ascii=False, indent=4)
logging.info(f"已成功将数据保存至 {filename}")
print(f"已成功将数据保存至 {filename}")
except Exception as e:
error_msg = f"保存JSON文件时出错: {e}"
logging.error(error_msg)
raise IOError(error_msg)
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# 示例数据
api_data = {
"opening_statement": "",
"suggested_questions": [],
"suggested_questions_after_answer": {"enabled": False},
"speech_to_text": {"enabled": False},
"text_to_speech": {"enabled": False, "language": "", "voice": ""},
"retriever_resource": {"enabled": False},
"annotation_reply": {"enabled": False},
"more_like_this": {"enabled": False},
"user_input_form": [
{
"paragraph": {
"label": "文案内容",
"max_length": 33024,
"options": [],
"required": True,
"type": "paragraph",
"variable": "content",
}
}
],
"sensitive_word_avoidance": {"enabled": False},
"file_upload": {
"image": {
"enabled": False,
"number_limits": 3,
"transfer_methods": ["local_file", "remote_url"],
}
},
"system_parameters": {"image_file_size_limit": "10"},
}
try:
create_json_file(api_data)
except Exception as e:
logging.error(f"程序执行失败: {e}")

View File

@@ -0,0 +1,53 @@
from abc import ABC
# class WorkflowDifyAPI和chatDifyApi和completionDifyApi
# dify_app_mode_type workflow, chat, completion
class TaskInstance(ABC):
def __init__(self, base_url, dify_app_sks, dify_app_mode_type):
self.base_url = base_url
self.dify_app_sks = dify_app_sks
self.dify_app_mode_type = dify_app_mode_type
def get_task_instance(self, task_id: str):
"""
根据dify_app_mode_type返回相应的API实例
Args:
task_id: 任务ID
Returns:
返回对应的API实例
Raises:
ValueError: 当dify_app_mode_type无效时抛出异常
"""
from src.workflow.workflow_server import WorkflowDifyAPI
from src.completion.completion_server import CompletionDifyAPI
from src.chat.chat_server import ChatDifyAPI
# 使用字典映射提高代码灵活性和可维护性
api_classes = {
"workflow": WorkflowDifyAPI,
"chat": ChatDifyAPI,
"completion": CompletionDifyAPI,
}
# 检查mode_type是否有效
if self.dify_app_mode_type.lower() not in api_classes:
supported_types = ", ".join(api_classes.keys())
raise ValueError(
f"不支持的dify_app_mode_type: {self.dify_app_mode_type},支持的类型: {supported_types}"
)
# 获取对应的API类
api_class = api_classes[self.dify_app_mode_type.lower()]
# 这里假设所有API类都接受相同的参数集
# 如果各API类构造函数参数不同需要针对每种类型单独处理
return api_class(
self.base_url,
self.dify_app_sks,
)

View File

@@ -0,0 +1,153 @@
import os
import sys
from typing import Dict, Any, Optional
# 导入翻译函数
from .translator import translate
class TranslationService:
"""翻译服务类,用于处理各种翻译需求"""
@staticmethod
def create_prompt(
content: str,
target_lang: str,
use_case: str,
style: str,
prompt_type: str = "general",
keep_terms_desc: str = "核心术语",
) -> str:
"""
创建翻译提示
Args:
content: 待翻译内容
target_lang: 目标语言
use_case: 使用场景
style: 翻译风格
prompt_type: 提示类型,可选值: "general", "tool_name", "tool_description"
keep_terms_desc: 保留术语的描述
Returns:
str: 格式化的翻译提示
"""
if prompt_type == "tool_name":
keep_terms_desc = "核心术语(如 小写,词语需要用下划线连接)"
elif prompt_type == "tool_description":
keep_terms_desc = "核心术语(这是一段话)"
return f"""
角色:专业本地化翻译专家
任务:将以下内容翻译为{target_lang}(目标用途:{use_case}
要求:
1. 仅返回译文,不含解释或原文;
2. 保留{keep_terms_desc}
3. 符合{style}风格;
4. 特殊符号保持原样。
示例输出格式:
Translated Text
待翻译内容:
{content}
"""
@staticmethod
def translate_text(
content: str,
target_lang: str,
use_case: str = "",
style: str = "正式且符合技术品牌调性",
prompt_type: str = "general",
) -> Dict[str, Any]:
"""
翻译文本
Args:
content: 待翻译内容
target_lang: 目标语言
use_case: 使用场景,默认为空
style: 翻译风格,默认为"正式且符合技术品牌调性"
prompt_type: 提示类型,可选值: "general", "tool_name", "tool_description"
Returns:
Dict: 包含翻译结果的字典
"""
prompt = TranslationService.create_prompt(
content=content,
target_lang=target_lang,
use_case=use_case,
style=style,
prompt_type=prompt_type,
)
try:
result = translate(prompt, target_lang)
return result
except Exception as e:
print(f"翻译出错: {str(e)}")
return {"translated_text": "", "error": str(e)}
@staticmethod
def translate_tool_name(
name: str,
target_lang: str = "英语",
use_case: str = "工具名称",
style: str = "正式且符合技术品牌调性,大模型能理解",
) -> str:
"""
翻译工具名称的便捷方法
Returns:
str: 翻译后的工具名称
"""
result = TranslationService.translate_text(
content=name,
target_lang=target_lang,
use_case=use_case,
style=style,
prompt_type="tool_name",
)
return result.get("translated_text", "")
@staticmethod
def translate_tool_description(
description: str,
target_lang: str = "英语",
use_case: str = "工具描述",
style: str = "正式且符合技术品牌调性,大模型能理解",
) -> str:
"""
翻译工具描述的便捷方法
Returns:
str: 翻译后的工具描述
"""
result = TranslationService.translate_text(
content=description,
target_lang=target_lang,
use_case=use_case,
style=style,
prompt_type="tool_description",
)
return result.get("translated_text", "")
def translation_example():
"""翻译功能使用示例"""
# 示例1: 翻译工具名称
tool_name = "万川AI新媒体平台【测试环境】"
translated_name = TranslationService.translate_tool_name(tool_name)
print(f"工具名称翻译: {translated_name}")
# 示例2: 翻译工具描述
description = "21日辛柏青发布讣告宣布妻子朱媛媛抗癌五年后离世。此前在一次路演现场当观众问及朱媛媛时辛柏青2秒停顿藏着"
translated_desc = TranslationService.translate_tool_description(description)
print(f"工具描述翻译: {translated_desc}")
if __name__ == "__main__":
translation_example()

View File

@@ -0,0 +1,64 @@
import os
import requests
import json
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# ========== 模型相关 ==========
# 从.env文件获取模型API配置
BASE_URL = os.getenv("BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")
API_KEY = os.getenv("OPENAI_API_KEY", "sk-c5a912a6bc8e4c9cbdbdf68232352a03")
TEMPERATURE = float(os.getenv("MODEL_TEMPERATURE", "0.7"))
def translate(content, target_language):
"""
翻译文本内容到目标语言
:param content: 要翻译的内容
:param target_language: 目标语言,如'en'(英语), 'zh'(中文), 'ja'(日语), 'fr'(法语)等
:return: 翻译后的内容,如果翻译失败则返回原文和错误信息
"""
if not content or not target_language:
return {"error": "内容或目标语言不能为空", "translated_text": content}
# 确保API密钥已设置
if not API_KEY:
return {"error": "API密钥未设置请检查.env文件", "translated_text": content}
try:
# 构建API请求头
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}",
}
# 构建翻译提示
prompt = f"请将以下内容翻译成{target_language},只返回翻译结果,不要包含任何解释或原文:\n\n{content}"
# 构建API请求体
data = {
"model": "qwen-max", # 使用通义千问模型,可以根据实际需要更改
"messages": [{"role": "user", "content": prompt}],
"temperature": TEMPERATURE,
}
# 发送API请求
response = requests.post(
f"{BASE_URL}/chat/completions", headers=headers, json=data
)
# 解析响应
if response.status_code == 200:
result = response.json()
translated_text = result["choices"][0]["message"]["content"].strip()
return {"translated_text": translated_text}
else:
error_message = (
f"翻译失败,状态码: {response.status_code}, 响应: {response.text}"
)
return {"error": error_message, "translated_text": content}
except Exception as e:
return {"error": f"翻译过程中发生错误: {str(e)}", "translated_text": content}

View File

@@ -0,0 +1,175 @@
import requests
from abc import ABC
import logging
import json
import re
logger = logging.getLogger(__name__)
import pypinyin
def pinyin_to_camel(pinyin):
"""
将拼音列表转换为驼峰命名
例如: ['ni', 'hao', 'a'] -> 'NiHaoA'
所有非字母数字字符会被替换为下划线
"""
# 使用正则表达式将所有非字母数字字符(包括所有标点符号)替换为下划线
cleaned = re.sub(r'[^\w\s]', '_', pinyin)
# 将空格也替换为下划线
cleaned = re.sub(r'\s+', '_', cleaned)
# 移除连续的下划线并去除首尾下划线
cleaned = re.sub(r'_+', '_', cleaned).strip('_')
# 转换为拼音并生成驼峰命名
pinyin_list = pypinyin.lazy_pinyin(cleaned)
return "tool_" + "".join(word.capitalize() for word in pinyin_list)
class WorkflowDifyAPI(ABC):
def __init__(self, base_url: str, dify_app_sks: list, user="pp666"):
# dify configs
self.dify_base_url = base_url
self.dify_app_sks = dify_app_sks
self.user = user
# dify app infos
dify_app_infos = []
dify_app_params = []
dify_app_metas = []
for key in self.dify_app_sks:
dify_app_infos.append(self.get_app_info(key))
dify_app_params.append(self.get_app_parameters(key))
dify_app_metas.append(self.get_app_meta(key))
self.dify_app_infos = dify_app_infos
self.dify_app_params = dify_app_params
self.dify_app_metas = dify_app_metas
self.dify_app_names = [x["name"] for x in dify_app_infos]
def chat_message(
self,
api_key,
inputs={},
response_mode="streaming",
conversation_id=None,
userId="pp666",
files=None,
):
url = f"{self.dify_base_url}/workflows/run"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = {
"inputs": inputs,
"response_mode": response_mode,
"user": userId,
}
logger.info("Sending data to Dify API: %s", data)
logger.info("Sending headers to Dify API: %s", headers)
logger.info("Sending url to Dify API: %s", url)
if conversation_id:
data["conversation_id"] = conversation_id
if files:
files_data = []
for file_info in files:
file_path = file_info.get("path")
transfer_method = file_info.get("transfer_method")
if transfer_method == "local_file":
files_data.append(("file", open(file_path, "rb")))
elif transfer_method == "remote_url":
pass
response = requests.post(
url,
headers=headers,
data=data,
files=files_data,
stream=response_mode == "streaming",
)
else:
response = requests.post(
url, headers=headers, json=data, stream=response_mode == "streaming"
)
response.raise_for_status()
if response_mode == "streaming":
for line in response.iter_lines():
if line:
if line.startswith(b"data:"):
try:
json_data = json.loads(line[5:].decode("utf-8"))
yield json_data
except json.JSONDecodeError:
print(f"Error decoding JSON: {line}")
else:
return response.json()
def upload_file(self, api_key, file_path, user="pp666"):
url = f"{self.dify_base_url}/files/upload"
headers = {"Authorization": f"Bearer {api_key}"}
files = {"file": open(file_path, "rb")}
data = {"user": user}
response = requests.post(url, headers=headers, files=files, data=data)
response.raise_for_status()
return response.json()
def stop_response(self, api_key, task_id, user="pp666"):
url = f"{self.dify_base_url}/chat-messages/{task_id}/stop"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = {"user": user}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
def get_app_info(self, api_key, user="pp666"):
url = f"{self.dify_base_url}/info"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"user": user}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
from src.utils.tool_translation import TranslationService
response_map = response.json()
# 翻译工具名称
tool_name = response_map.get("name")
if tool_name:
# translated_name = TranslationService.translate_tool_name(tool_name)
translated_name = pinyin_to_camel(tool_name)
response_map["name"] = translated_name
# 翻译工具描述
# tool_description = response_map.get("description")
# if tool_description:
# translated_description = TranslationService.translate_tool_description(
# tool_description
# )
# response_map["description"] = (
# f"{tool_description} ({translated_description})"
# )
return response_map
def get_app_parameters(self, api_key, user="pp666"):
url = f"{self.dify_base_url}/parameters"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"user": user}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
def get_app_meta(self, api_key, user="pp666"):
url = f"{self.dify_base_url}/meta"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"user": user}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()