Files
datie-bom/browser_login/login.py
2026-06-26 20:04:22 +08:00

526 lines
21 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ERP 登录模块 - DrissionPage
"""
import os
import sys
import time
import shutil
import json
import datetime
import subprocess
import urllib.request
from pathlib import Path
from dotenv import load_dotenv
from DrissionPage import ChromiumPage, ChromiumOptions
import DrissionPage._base.driver as dp_driver_module
from DrissionPage._base.chromium import Chromium, handle_options
from DrissionPage._base.driver import BrowserDriver
from DrissionPage._pages.chromium_tab import ChromiumTab
from websocket import create_connection as raw_ws_create_connection, WebSocketBadStatusException
# ── 加载 .env ─────────────────────────────────────────────────────────────────
load_dotenv(Path(__file__).parent / ".env")
# 强制 Python 的 websocket 客户端忽略本地代理,防止出现 Handshake status 404 Not Found
os.environ["NO_PROXY"] = "localhost,127.0.0.1,::1"
os.environ["no_proxy"] = "localhost,127.0.0.1,::1"
ERP_URL = os.getenv("ERP_URL", "https://yunmes.tftykj.cn/#")
ERP_TENANT = os.getenv("ERP_TENANT", "")
ERP_USERNAME = os.getenv("ERP_USERNAME", "")
ERP_PASSWORD = os.getenv("ERP_PASSWORD", "")
def is_docker_env() -> bool:
"""判断当前是否运行在 Docker 容器中。"""
return os.path.exists("/.dockerenv")
def is_linux_env() -> bool:
"""判断当前是否运行在 Linux 环境。"""
return sys.platform.startswith("linux")
def patch_drission_ws_handshake() -> None:
"""
为 Linux 环境下的 DrissionPage WebSocket 握手增加兼容性降级。
"""
if getattr(dp_driver_module, "_DTSK_WS_PATCHED", False):
return
def resilient_create_connection(address, **kwargs):
log("INFO", f"[DEBUG-WS] websocket-client trying to connect to: {address}")
base_kwargs = dict(kwargs)
# 强制禁用代理,防止请求被容器内的网络规则重定向
no_proxy_hosts = ["127.0.0.1", "localhost", "::1"]
base_kwargs["http_no_proxy"] = no_proxy_hosts
base_kwargs["http_proxy_host"] = None
base_kwargs["http_proxy_port"] = None
# Chrome 149 严格校验,如果 suppress_origin=True 且没有合法 Host/Origin会被 404
# 我们这里准备几个干净的 base_kwargs 变体
clean_kwargs = dict(base_kwargs)
clean_kwargs.pop("suppress_origin", None)
candidate_kwargs = [
# 策略1最干净的连接强制关闭 suppress_origin
{
**clean_kwargs,
"suppress_origin": False
},
# 策略2带上明确的 Origin
{
**clean_kwargs,
"suppress_origin": False,
"header": ["Origin: http://127.0.0.1"]
},
# 策略3尊重原始调用的参数DrissionPage 默认行为)
{
**base_kwargs
}
]
last_err = None
for i, candidate in enumerate(candidate_kwargs, 1):
try:
# 强制使用 127.0.0.1,因为在 Docker 内 localhost 可能解析异常
target_url = address.replace("localhost", "127.0.0.1")
log("INFO", f"[DEBUG-WS] 尝试策略 {i} 连接 {target_url} (suppress_origin={candidate.get('suppress_origin')}, header={candidate.get('header')})")
return raw_ws_create_connection(target_url, **candidate)
except WebSocketBadStatusException as ws_err:
log("WARN", f"[DEBUG-WS] 策略 {i} 失败 (WebSocketBadStatusException): {ws_err}")
last_err = ws_err
except Exception as other_err:
log("WARN", f"[DEBUG-WS] 策略 {i} 失败 (Exception): {other_err}")
last_err = other_err
break
raise last_err
# 重新补回 WS 握手补丁,这两个补丁都需要
if is_linux_env() and not getattr(dp_driver_module, "_DTSK_WS_PATCHED", False):
dp_driver_module.create_connection = resilient_create_connection
import websocket
websocket.create_connection = resilient_create_connection
import DrissionPage._base.driver as driver_mod
driver_mod.create_connection = resilient_create_connection
dp_driver_module._DTSK_WS_PATCHED = True
# 之前加在 ChromiumPage(opt) 前面其实没用,因为 run_browser 是在 ChromiumPage 实例化时才触发启动。
# 真正的解决办法是拦截原生库里去请求 /json/version 的那段代码。
if is_linux_env() and not getattr(dp_driver_module, "_DTSK_UUID_PATCHED", False):
import DrissionPage._functions.browser as dp_browser_module
# 保存原生的 run_browser 函数
original_run_browser = dp_browser_module._run_browser
def resilient_run_browser(port, path, args):
log("INFO", "[DEBUG] 拦截到 _run_browser 启动 Chrome强制等待 2 秒以防 UUID 跳变...")
res = original_run_browser(port, path, args)
time.sleep(2)
return res
# 替换它
dp_browser_module._run_browser = resilient_run_browser
dp_driver_module._DTSK_UUID_PATCHED = True
# ==============================================================
patch_drission_ws_handshake()
def get_docker_tmp_root() -> Path:
"""
指定 DrissionPage 在 Docker 中的临时根目录。
auto_port() 会在该目录下自动创建独立端口和用户目录。
"""
tmp_root = Path(os.getenv("DRISSION_TMP_ROOT", "/tmp")) / "DrissionPage"
tmp_root.mkdir(parents=True, exist_ok=True)
return tmp_root
def resolve_browser_path() -> str:
"""
统一解析浏览器路径。
Linux 生产环境优先使用 Google Chrome Stable避免 Chromium 与
DrissionPage 在 DevTools WebSocket 握手阶段出现兼容性问题。
"""
env_candidates = [
os.getenv("DRISSION_BROWSER_PATH", "").strip(),
os.getenv("CHROME_BIN", "").strip(),
os.getenv("BROWSER_PATH", "").strip(),
]
for candidate in env_candidates:
if candidate and os.path.exists(candidate):
return candidate
if is_linux_env():
browser_candidates = [
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
"/usr/bin/chromium",
"/usr/bin/chromium-browser",
]
else:
browser_candidates = [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
shutil.which("google-chrome") or "",
shutil.which("chromium") or "",
shutil.which("chromium-browser") or "",
]
for candidate in browser_candidates:
if candidate and os.path.exists(candidate):
return candidate
return ""
def cleanup_debug_port(address: str) -> None:
"""按实际 DevTools 端口清理僵尸浏览器进程及残留的锁文件。"""
if not address or ":" not in address:
return
debug_port = address.rsplit(":", 1)[-1]
# 1. 杀掉占用端口的僵尸进程
subprocess.run(
f"lsof -ti tcp:{debug_port} | xargs -r kill -9",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# 2. 如果在 Linux 环境,彻底清理残留的 DrissionPage 锁文件,防止 "Failed to create SingletonLock"
if is_linux_env():
subprocess.run(
"rm -rf /tmp/DrissionPage* /tmp/.org.chromium.*",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def clear_drission_singletons() -> None:
"""
清理 DrissionPage 的全局单例缓存。
失败后的半初始化对象会残留在这些缓存中,后续重试会错误复用脏对象。
"""
Chromium._BROWSERS.clear()
ChromiumPage._PAGES.clear()
ChromiumTab._TABS.clear()
BrowserDriver.BROWSERS.clear()
def probe_devtools_endpoints(address: str, log_output: bool = True) -> dict:
"""探测 DevTools HTTP 端点,并返回解析后的版本信息与标签页信息。"""
result = {"version": None, "list": None}
if not address:
return result
endpoint_map = {"version": "json/version", "list": "json/list"}
for key, endpoint in endpoint_map.items():
url = f"http://{address}/{endpoint}"
try:
with urllib.request.urlopen(url, timeout=2) as resp:
body = resp.read().decode("utf-8", errors="replace")
if log_output:
log("WARN", f"[DEBUG] DevTools 探测 {url} -> HTTP {resp.status}")
log("WARN", f"[DEBUG] DevTools 响应体 {endpoint}: {body[:1000]}")
result[key] = json.loads(body)
except Exception as probe_err:
if log_output:
log("WARN", f"[DEBUG] DevTools 探测失败 {url}: {probe_err}")
return result
def get_first_page_ws_address(devtools_payload: dict) -> str:
"""从 /json/list 响应中提取第一个可用 page/webview 的 ws 地址。"""
tabs = devtools_payload.get("list") or []
if not isinstance(tabs, list):
return ""
for tab in tabs:
if (
isinstance(tab, dict)
and tab.get("type") in ("page", "webview")
and not str(tab.get("url", "")).startswith("devtools://")
and tab.get("webSocketDebuggerUrl")
):
return tab["webSocketDebuggerUrl"]
return ""
# ── 日志 ──────────────────────────────────────────────────────────────────────
def log(level: str, msg: str):
icons = {"INFO": " ", "OK": "", "WARN": "⚠️ ", "ERR": ""}
ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{ts}] {icons.get(level, ' ')} [{level}] {msg}", flush=True)
def dump_page_state(page: ChromiumPage, label: str = ""):
"""仅在出错时调用,打印页面快照用于排查。"""
tag = f" ({label})" if label else ""
log("WARN", f"─── 页面快照{tag} ───")
log("WARN", f" URL : {page.url}")
log("WARN", f" 标题 : {page.title}")
# 错误提示timeout=0 不阻塞)
for sel in [".el-message--error", ".error-msg"]:
el = page.ele(sel, timeout=0)
if el and el.text.strip():
log("WARN", f" 页面提示: {el.text.strip()}")
log("WARN", "─────────────────────────────")
# ── 浏览器 ────────────────────────────────────────────────────────────────────
def get_page(headless: bool = False, port: int = 9222) -> ChromiumPage:
co = ChromiumOptions()
is_docker = is_docker_env()
browser_path = resolve_browser_path()
effective_headless = headless or is_docker
if is_docker:
clear_drission_singletons()
# 暴力清理所有残留的 Chrome 进程和 Drission 临时文件,防止 Zombie Chrome 干扰 auto_port
subprocess.run("pkill -9 -f chrome || true", shell=True, stderr=subprocess.DEVNULL)
subprocess.run("rm -rf /tmp/DrissionPage*", shell=True, stderr=subprocess.DEVNULL)
if effective_headless:
co.set_argument("--headless=new")
co.set_argument("--disable-gpu")
# 针对 Linux 环境下的 SingletonLock 僵尸进程导致 Chrome 静默崩溃的问题
if is_linux_env():
co.set_argument("--disable-features=ProcessSingleton")
co.set_argument("--disable-blink-features=AutomationControlled")
co.set_argument("--no-sandbox")
co.set_argument("--disable-dev-shm-usage")
co.set_argument("--disable-software-rasterizer")
co.set_argument("--remote-allow-origins=*")
co.set_argument("--remote-debugging-address=127.0.0.1")
co.set_argument("--disable-web-security")
co.set_argument("--ignore-certificate-errors")
co.set_argument("--proxy-server=direct://")
co.set_argument("--proxy-bypass-list=*")
co.set_argument("--window-size=1440,900")
if browser_path:
co.set_browser_path(browser_path)
log("INFO", f"选用浏览器内核: {browser_path}")
else:
log("WARN", "未解析到明确浏览器路径,将使用 DrissionPage 默认浏览器发现逻辑。")
if is_docker:
tmp_root = get_docker_tmp_root()
co.set_tmp_path(str(tmp_root))
co.auto_port(True)
log("INFO", f"Docker Drission 临时目录: {tmp_root}")
else:
co.set_local_port(port)
# #region debug-point A:drission-target
opt = handle_options(co)
log(
"INFO",
"[DEBUG] Chromium 连接参数: "
f"address={opt.address or '<empty>'}, "
f"browser_path={opt.browser_path or '<auto>'}, "
f"auto_port={opt.is_auto_port}, "
f"headless={opt.is_headless}, "
f"user_data_path={getattr(opt, 'user_data_path', '') or '<auto>'}"
)
log(
"INFO",
"[DEBUG] Chromium 启动参数: "
+ " | ".join(opt.arguments)
)
# #endregion
try:
# local_port 可能不存在,使用 getattr 避免报错
log("INFO", f"[DEBUG] 准备实例化 ChromiumPage, address={opt.address}, local_port={getattr(opt, 'local_port', 'auto')}")
page = ChromiumPage(opt)
log("OK", "[DEBUG] ChromiumPage 实例化成功!")
return page
except Exception as e:
actual_address = opt.address or f"127.0.0.1:{port}"
log("WARN", f"[DEBUG] ChromiumPage 初始化失败 (第一次尝试): {e}")
devtools_payload = probe_devtools_endpoints(actual_address, log_output=True) if opt.address else {"version": None, "list": None}
fallback_page_ws = get_first_page_ws_address(devtools_payload)
if is_linux_env() and fallback_page_ws and "Handshake status 404 Not Found" in str(e):
log("WARN", f"[DEBUG] Browser WS 握手失败,尝试降级连接 Page WS: {fallback_page_ws}")
try:
clear_drission_singletons()
fallback_co = ChromiumOptions()
fallback_co.set_address(fallback_page_ws)
page = ChromiumPage(fallback_co)
log("OK", "[DEBUG] 已通过 Page WS 降级连接成功。")
return page
except Exception as ws_only_e:
log("ERR", f"[DEBUG] Page WS 降级连接失败: {ws_only_e}")
e = ws_only_e
log("WARN", f"[DEBUG] 尝试清理并完全重启浏览器...")
try:
clear_drission_singletons()
cleanup_debug_port(actual_address)
subprocess.run("pkill -9 -f chrome || true", shell=True, stderr=subprocess.DEVNULL)
time.sleep(1)
# 不复用 opt防止残留上次的 ws_address
# 重新调用自己,但要注意避免无限递归,这里简单返回一个新创建的即可
# 为了简单,我们可以直接用一个新的 opt 配置
new_co = ChromiumOptions()
new_co.set_argument("--headless=new")
new_co.set_argument("--disable-gpu")
new_co.set_argument("--no-sandbox")
new_co.set_argument("--disable-dev-shm-usage")
new_co.set_argument("--remote-allow-origins=*")
new_co.set_argument("--remote-debugging-address=127.0.0.1")
new_co.auto_port(True)
if browser_path:
new_co.set_browser_path(browser_path)
log("INFO", "[DEBUG] 等待 2 秒,让清理后重启的 Chrome 彻底准备好 UUID...")
time.sleep(2)
page = ChromiumPage(new_co)
log("OK", "[DEBUG] 清理后重试实例化 ChromiumPage 成功!")
return page
except Exception as retry_e:
log("ERR", f"[DEBUG] 清理后重试依然失败 (第二次尝试): {retry_e}")
e = retry_e
# #region debug-point B:devtools-http-probe
if not opt.address:
log("WARN", "[DEBUG] DevTools 探测跳过address 为空")
# #endregion
log("ERR", f"浏览器初始化失败: {e}")
raise e
# ── Vue 表单专用输入JS setter + 模拟键盘) ──────────────────────────────────
def set_input_value(page: ChromiumPage, ele, value: str):
"""
Vue 双向绑定下 clear()+input() 会留残值,必须用 JS 原生 setter 清空
再通过模拟键盘输入触发 keydown/change 事件。
"""
page.run_js("""
var el = arguments[0];
Object.getOwnPropertyDescriptor(
window.HTMLInputElement.prototype, 'value'
).set.call(el, '');
el.dispatchEvent(new Event('input', {bubbles: true}));
""", ele)
time.sleep(0.05) # 等 Vue 处理 clear 事件
ele.input(value)
# ── 自动登录(使用 .env 账号) ────────────────────────────────────────────────
def login(page: ChromiumPage) -> bool:
"""自动填写 .env 中的账号信息并登录。"""
log("INFO", f"打开登录页: {ERP_URL}")
page.get(ERP_URL)
# 先用较短 timeout 等登录表单;若不出现则检测是否已处于登录状态
tenant_input = page.ele("@placeholder=租户代码", timeout=5)
if not tenant_input:
# 没有登录表单 → 判断是否已登录(主界面无密码框)
has_pwd = bool(page.ele("@placeholder=密码", timeout=0))
if not has_pwd:
log("OK", f"检测到已登录状态,跳过登录 → {page.url}")
return True
log("ERR", "未找到登录表单,页面可能未加载")
dump_page_state(page, "找不到表单")
return False
log("INFO", "填写登录信息...")
set_input_value(page, tenant_input, ERP_TENANT)
set_input_value(page, page.ele("@placeholder=账号"), ERP_USERNAME)
set_input_value(page, page.ele("@placeholder=密码"), ERP_PASSWORD)
time.sleep(0.2) # 等所有字段的 change/blur 事件结算
# 点击登录按钮
login_btn = page.ele(
"xpath://*[@id='app']/div/div[3]/div/div[2]/div[2]/div[5]/button",
timeout=5
)
if not login_btn:
log("ERR", "未找到登录按钮XPath 失效,请更新选择器)")
dump_page_state(page, "找不到登录按钮")
return False
log("INFO", "点击登录...")
login_btn.click()
return _wait_for_login(page)
# ── 手动登录(等待用户在浏览器操作) ─────────────────────────────────────────
def login_manual(page: ChromiumPage) -> bool:
"""打开登录页,等待用户手动完成登录操作。"""
log("INFO", f"打开登录页(手动模式): {ERP_URL}")
page.get(ERP_URL)
# 确认登录页已加载
tenant_input = page.ele("@placeholder=租户代码", timeout=15)
if not tenant_input:
log("ERR", "未找到登录表单")
dump_page_state(page, "找不到表单")
return False
log("INFO", "" * 50)
log("INFO", " 请在浏览器中手动输入账号并登录")
log("INFO", " 程序将在检测到登录成功后自动继续...")
log("INFO", "" * 50)
return _wait_for_login(page, timeout=120) # 手动模式等待 2 分钟
# ── 等待登录结果(自动/手动共用) ────────────────────────────────────────────
def _wait_for_login(page: ChromiumPage, timeout: int = 15) -> bool:
"""轮询检测登录结果:表单消失 → 成功error 提示 → 失败。"""
for elapsed in range(1, timeout + 1):
time.sleep(1)
has_form = bool(page.ele("@placeholder=密码", timeout=0))
err_el = page.ele(".el-message--error", timeout=0)
err_text = err_el.text.strip() if err_el else ""
if err_text:
log("ERR", f"登录失败: {err_text}")
dump_page_state(page)
return False
if not has_form:
log("OK", f"登录成功({elapsed}s{page.url}")
return True
if elapsed % 10 == 0: # 每 10 秒提示一次进度(手动模式有用)
log("INFO", f" 等待登录中... ({elapsed}s)")
log("WARN", f"超过 {timeout}s 未检测到登录成功")
dump_page_state(page, "登录超时")
return False
# ── 单独运行时的入口 ──────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
manual = "--manual" in sys.argv
page = get_page()
try:
ok = login_manual(page) if manual else login(page)
if ok:
log("OK", "登录完成,按 Enter 关闭浏览器")
input()
else:
log("ERR", "登录失败")
time.sleep(3)
except KeyboardInterrupt:
log("INFO", "用户中断")
finally:
page.quit()
log("INFO", "浏览器已关闭")