Files
page-agent/demo_erp_scrape.py
Zou-Seay 9f22a730df
Some checks failed
CI / test (24) (push) Has been cancelled
Deploy Demo / deploy (push) Has been cancelled
fix(extension): resolve race condition in hub config and fix tab initialization crash
add: erp scrape python demo
2026-06-25 15:31:41 +08:00

59 lines
2.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# 加载当前目录下的 .env 文件
load_dotenv()
async def run_browser_automation():
print("⏳ 正在启动 Page Agent MCP 服务器...")
print("👉 如果你没有打开浏览器,系统会自动帮你唤起默认浏览器!\n")
# 1. 配置 MCP 服务端
# 它会自动读取当前系统的 PATH调用 npx 启动 @page-agent/mcp
server_params = StdioServerParameters(
command="npx",
args=["-y", "@page-agent/mcp"],
env={
**os.environ,
"LLM_BASE_URL": os.getenv("LLM_BASE_URL", ""),
"LLM_API_KEY": os.getenv("LLM_API_KEY", ""),
"LLM_MODEL_NAME": os.getenv("LLM_MODEL_NAME", "qwen-plus")
}
)
# 2. 通过标准输入输出Stdio连接到插件的 MCP 服务
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 初始化握手
await session.initialize()
print("✅ 成功连接到了 MCP 服务端!正在等待浏览器插件握手...")
# 轮询等待浏览器插件连上来
for _ in range(15):
status_res = await session.call_tool("get_status", {})
if '"connected": true' in status_res.content[0].text:
break
await asyncio.sleep(1)
else:
print("❌ 浏览器插件连接超时!请确保 Chrome 浏览器正常运行,并启用了 Page Agent 插件。")
return
task_prompt = "请你先导航到 https://preview.pro.ant.design/list/table-list/ 页面,然后使用自定义工具 scrapeErpOrdersTool 来抓取当前页面的所有分页表格数据。抓取完成后,请告诉我一共抓取到了多少条数据,并列出前三条作为示例。"
print(f"🤖 派发自然语言任务: 【{task_prompt}\n")
print("👁️‍🗨️ 正在观测浏览器执行... (你可以切回浏览器看着它自动操作,大概需要几十秒)")
# 调用 MCP 的 execute_task 工具
result = await session.call_tool("execute_task", {
"task": task_prompt
})
print("\n🎉 浏览器任务执行完毕,大模型返回的最终结果是:")
print(result.content[0].text)
if __name__ == "__main__":
asyncio.run(run_browser_automation())