Files
datie-bom/browser_login/login.py
2026-06-26 17:20:04 +08:00

483 lines
18 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ERP 登录模块 - DrissionPage
"""
import os
import sys
import time
import shutil
import json
import datetime
import subprocess
import urllib.request
from pathlib import Path
from dotenv import load_dotenv
from DrissionPage import ChromiumPage, ChromiumOptions
import DrissionPage._base.driver as dp_driver_module
from DrissionPage._base.chromium import Chromium, handle_options
from DrissionPage._base.driver import BrowserDriver
from DrissionPage._pages.chromium_tab import ChromiumTab
from websocket import create_connection as raw_ws_create_connection, WebSocketBadStatusException
# ── 加载 .env ─────────────────────────────────────────────────────────────────
load_dotenv(Path(__file__).parent / ".env")
# 强制 Python 的 websocket 客户端忽略本地代理,防止出现 Handshake status 404 Not Found
os.environ["NO_PROXY"] = "localhost,127.0.0.1,::1"
os.environ["no_proxy"] = "localhost,127.0.0.1,::1"
ERP_URL = os.getenv("ERP_URL", "https://yunmes.tftykj.cn/#")
ERP_TENANT = os.getenv("ERP_TENANT", "")
ERP_USERNAME = os.getenv("ERP_USERNAME", "")
ERP_PASSWORD = os.getenv("ERP_PASSWORD", "")
def is_docker_env() -> bool:
"""判断当前是否运行在 Docker 容器中。"""
return os.path.exists("/.dockerenv")
def is_linux_env() -> bool:
"""判断当前是否运行在 Linux 环境。"""
return sys.platform.startswith("linux")
def patch_drission_ws_handshake() -> None:
"""
为 Linux 环境下的 DrissionPage WebSocket 握手增加兼容性降级。
"""
if getattr(dp_driver_module, "_DTSK_WS_PATCHED", False):
return
def resilient_create_connection(address, **kwargs):
# 终极暴力破解法:移除所有可能导致 Chrome 安全校验失败的头
base_kwargs = dict(kwargs)
# 强制禁用代理,防止请求被容器内的网络规则重定向
no_proxy_hosts = ["127.0.0.1", "localhost", "::1"]
base_kwargs["http_no_proxy"] = no_proxy_hosts
base_kwargs["http_proxy_host"] = None
base_kwargs["http_proxy_port"] = None
# 提取目标端口,用于构造合法的 Host 头
try:
port = address.split(":")[2].split("/")[0]
except IndexError:
port = "9222"
# Chrome 149 增强了 DevTools 的安全校验。
# Host header 必须是 IP 地址或 localhost且必须包含端口号
# 注意websocket-client 会自动生成 Host 头,如果这里手动加上会导致发送重复的 Host 头,从而被 Chrome 500 拒绝。
candidate_kwargs = [
# 策略1最标准的 localhost 组合,不手动加 Host 避免重复
{
**base_kwargs,
"suppress_origin": False,
"header": ["Origin: http://127.0.0.1"]
},
# 策略2最原始、最干净的连接方式类似 curl
{
**base_kwargs,
"suppress_origin": True,
"header": []
},
# 策略3伪装成 localhost
{
**base_kwargs,
"suppress_origin": False,
"header": ["Origin: http://localhost"]
}
]
last_err = None
for candidate in candidate_kwargs:
try:
# 强制使用 127.0.0.1,因为在 Docker 内 localhost 可能解析异常
target_url = address.replace("localhost", "127.0.0.1")
return raw_ws_create_connection(target_url, **candidate)
except WebSocketBadStatusException as ws_err:
last_err = ws_err
except Exception as other_err:
last_err = other_err
break
raise last_err
if is_linux_env():
dp_driver_module.create_connection = resilient_create_connection
dp_driver_module._DTSK_WS_PATCHED = True
patch_drission_ws_handshake()
def get_docker_tmp_root() -> Path:
"""
指定 DrissionPage 在 Docker 中的临时根目录。
auto_port() 会在该目录下自动创建独立端口和用户目录。
"""
tmp_root = Path(os.getenv("DRISSION_TMP_ROOT", "/tmp")) / "DrissionPage"
tmp_root.mkdir(parents=True, exist_ok=True)
return tmp_root
def resolve_browser_path() -> str:
"""
统一解析浏览器路径。
Linux 生产环境优先使用 Google Chrome Stable避免 Chromium 与
DrissionPage 在 DevTools WebSocket 握手阶段出现兼容性问题。
"""
env_candidates = [
os.getenv("DRISSION_BROWSER_PATH", "").strip(),
os.getenv("CHROME_BIN", "").strip(),
os.getenv("BROWSER_PATH", "").strip(),
]
for candidate in env_candidates:
if candidate and os.path.exists(candidate):
return candidate
if is_linux_env():
browser_candidates = [
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
"/usr/bin/chromium",
"/usr/bin/chromium-browser",
]
else:
browser_candidates = [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
shutil.which("google-chrome") or "",
shutil.which("chromium") or "",
shutil.which("chromium-browser") or "",
]
for candidate in browser_candidates:
if candidate and os.path.exists(candidate):
return candidate
return ""
def cleanup_debug_port(address: str) -> None:
"""按实际 DevTools 端口清理僵尸浏览器进程及残留的锁文件。"""
if not address or ":" not in address:
return
debug_port = address.rsplit(":", 1)[-1]
# 1. 杀掉占用端口的僵尸进程
subprocess.run(
f"lsof -ti tcp:{debug_port} | xargs -r kill -9",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# 2. 如果在 Linux 环境,彻底清理残留的 DrissionPage 锁文件,防止 "Failed to create SingletonLock"
if is_linux_env():
subprocess.run(
"rm -rf /tmp/DrissionPage* /tmp/.org.chromium.*",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def clear_drission_singletons() -> None:
"""
清理 DrissionPage 的全局单例缓存。
失败后的半初始化对象会残留在这些缓存中,后续重试会错误复用脏对象。
"""
Chromium._BROWSERS.clear()
ChromiumPage._PAGES.clear()
ChromiumTab._TABS.clear()
BrowserDriver.BROWSERS.clear()
def probe_devtools_endpoints(address: str, log_output: bool = True) -> dict:
"""探测 DevTools HTTP 端点,并返回解析后的版本信息与标签页信息。"""
result = {"version": None, "list": None}
if not address:
return result
endpoint_map = {"version": "json/version", "list": "json/list"}
for key, endpoint in endpoint_map.items():
url = f"http://{address}/{endpoint}"
try:
with urllib.request.urlopen(url, timeout=2) as resp:
body = resp.read().decode("utf-8", errors="replace")
if log_output:
log("WARN", f"[DEBUG] DevTools 探测 {url} -> HTTP {resp.status}")
log("WARN", f"[DEBUG] DevTools 响应体 {endpoint}: {body[:1000]}")
result[key] = json.loads(body)
except Exception as probe_err:
if log_output:
log("WARN", f"[DEBUG] DevTools 探测失败 {url}: {probe_err}")
return result
def get_first_page_ws_address(devtools_payload: dict) -> str:
"""从 /json/list 响应中提取第一个可用 page/webview 的 ws 地址。"""
tabs = devtools_payload.get("list") or []
if not isinstance(tabs, list):
return ""
for tab in tabs:
if (
isinstance(tab, dict)
and tab.get("type") in ("page", "webview")
and not str(tab.get("url", "")).startswith("devtools://")
and tab.get("webSocketDebuggerUrl")
):
return tab["webSocketDebuggerUrl"]
return ""
# ── 日志 ──────────────────────────────────────────────────────────────────────
def log(level: str, msg: str):
icons = {"INFO": " ", "OK": "", "WARN": "⚠️ ", "ERR": ""}
ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{ts}] {icons.get(level, ' ')} [{level}] {msg}", flush=True)
def dump_page_state(page: ChromiumPage, label: str = ""):
"""仅在出错时调用,打印页面快照用于排查。"""
tag = f" ({label})" if label else ""
log("WARN", f"─── 页面快照{tag} ───")
log("WARN", f" URL : {page.url}")
log("WARN", f" 标题 : {page.title}")
# 错误提示timeout=0 不阻塞)
for sel in [".el-message--error", ".error-msg"]:
el = page.ele(sel, timeout=0)
if el and el.text.strip():
log("WARN", f" 页面提示: {el.text.strip()}")
log("WARN", "─────────────────────────────")
# ── 浏览器 ────────────────────────────────────────────────────────────────────
def get_page(headless: bool = False, port: int = 9222) -> ChromiumPage:
co = ChromiumOptions()
is_docker = is_docker_env()
browser_path = resolve_browser_path()
effective_headless = headless or is_docker
if is_docker:
clear_drission_singletons()
if effective_headless:
co.set_argument("--headless=new")
co.set_argument("--disable-gpu")
# 针对 Linux 环境下的 SingletonLock 僵尸进程导致 Chrome 静默崩溃的问题
if is_linux_env():
co.set_argument("--disable-features=ProcessSingleton")
co.set_argument("--disable-blink-features=AutomationControlled")
co.set_argument("--no-sandbox")
co.set_argument("--disable-dev-shm-usage")
co.set_argument("--disable-software-rasterizer")
co.set_argument("--remote-allow-origins=*")
co.set_argument("--remote-debugging-address=127.0.0.1")
co.set_argument("--disable-web-security")
co.set_argument("--ignore-certificate-errors")
co.set_argument("--proxy-server=direct://")
co.set_argument("--proxy-bypass-list=*")
co.set_argument("--window-size=1440,900")
if browser_path:
co.set_browser_path(browser_path)
log("INFO", f"选用浏览器内核: {browser_path}")
else:
log("WARN", "未解析到明确浏览器路径,将使用 DrissionPage 默认浏览器发现逻辑。")
if is_docker:
tmp_root = get_docker_tmp_root()
co.set_tmp_path(str(tmp_root))
co.auto_port(True)
log("INFO", f"Docker Drission 临时目录: {tmp_root}")
else:
co.set_local_port(port)
# #region debug-point A:drission-target
opt = handle_options(co)
log(
"INFO",
"[DEBUG] Chromium 连接参数: "
f"address={opt.address or '<empty>'}, "
f"browser_path={opt.browser_path or '<auto>'}, "
f"auto_port={opt.is_auto_port}, "
f"headless={opt.is_headless}, "
f"user_data_path={getattr(opt, 'user_data_path', '') or '<auto>'}"
)
log(
"INFO",
"[DEBUG] Chromium 启动参数: "
+ " | ".join(opt.arguments)
)
# #endregion
try:
page = ChromiumPage(opt)
return page
except Exception as e:
actual_address = opt.address or f"127.0.0.1:{port}"
log("WARN", f"[DEBUG] ChromiumPage 初始化失败: {e}")
devtools_payload = probe_devtools_endpoints(actual_address, log_output=True) if opt.address else {"version": None, "list": None}
fallback_page_ws = get_first_page_ws_address(devtools_payload)
if is_linux_env() and fallback_page_ws and "Handshake status 404 Not Found" in str(e):
log("WARN", f"[DEBUG] Browser WS 握手失败,尝试降级连接 Page WS: {fallback_page_ws}")
try:
clear_drission_singletons()
fallback_co = ChromiumOptions()
fallback_co.set_address(fallback_page_ws)
page = ChromiumPage(fallback_co)
log("OK", "[DEBUG] 已通过 Page WS 降级连接成功。")
return page
except Exception as ws_only_e:
log("ERR", f"[DEBUG] Page WS 降级连接失败: {ws_only_e}")
e = ws_only_e
log("WARN", f"[DEBUG] 尝试清理地址 {actual_address} 后重试...")
try:
clear_drission_singletons()
cleanup_debug_port(actual_address)
time.sleep(1)
page = ChromiumPage(opt)
log("OK", "[DEBUG] 清理后重试成功!")
return page
except Exception as retry_e:
log("ERR", f"[DEBUG] 清理后重试依然失败: {retry_e}")
e = retry_e
# #region debug-point B:devtools-http-probe
if not opt.address:
log("WARN", "[DEBUG] DevTools 探测跳过address 为空")
# #endregion
log("ERR", f"浏览器初始化失败: {e}")
raise
# ── Vue 表单专用输入JS setter + 模拟键盘) ──────────────────────────────────
def set_input_value(page: ChromiumPage, ele, value: str):
"""
Vue 双向绑定下 clear()+input() 会留残值,必须用 JS 原生 setter 清空
再通过模拟键盘输入触发 keydown/change 事件。
"""
page.run_js("""
var el = arguments[0];
Object.getOwnPropertyDescriptor(
window.HTMLInputElement.prototype, 'value'
).set.call(el, '');
el.dispatchEvent(new Event('input', {bubbles: true}));
""", ele)
time.sleep(0.05) # 等 Vue 处理 clear 事件
ele.input(value)
# ── 自动登录(使用 .env 账号) ────────────────────────────────────────────────
def login(page: ChromiumPage) -> bool:
"""自动填写 .env 中的账号信息并登录。"""
log("INFO", f"打开登录页: {ERP_URL}")
page.get(ERP_URL)
# 先用较短 timeout 等登录表单;若不出现则检测是否已处于登录状态
tenant_input = page.ele("@placeholder=租户代码", timeout=5)
if not tenant_input:
# 没有登录表单 → 判断是否已登录(主界面无密码框)
has_pwd = bool(page.ele("@placeholder=密码", timeout=0))
if not has_pwd:
log("OK", f"检测到已登录状态,跳过登录 → {page.url}")
return True
log("ERR", "未找到登录表单,页面可能未加载")
dump_page_state(page, "找不到表单")
return False
log("INFO", "填写登录信息...")
set_input_value(page, tenant_input, ERP_TENANT)
set_input_value(page, page.ele("@placeholder=账号"), ERP_USERNAME)
set_input_value(page, page.ele("@placeholder=密码"), ERP_PASSWORD)
time.sleep(0.2) # 等所有字段的 change/blur 事件结算
# 点击登录按钮
login_btn = page.ele(
"xpath://*[@id='app']/div/div[3]/div/div[2]/div[2]/div[5]/button",
timeout=5
)
if not login_btn:
log("ERR", "未找到登录按钮XPath 失效,请更新选择器)")
dump_page_state(page, "找不到登录按钮")
return False
log("INFO", "点击登录...")
login_btn.click()
return _wait_for_login(page)
# ── 手动登录(等待用户在浏览器操作) ─────────────────────────────────────────
def login_manual(page: ChromiumPage) -> bool:
"""打开登录页,等待用户手动完成登录操作。"""
log("INFO", f"打开登录页(手动模式): {ERP_URL}")
page.get(ERP_URL)
# 确认登录页已加载
tenant_input = page.ele("@placeholder=租户代码", timeout=15)
if not tenant_input:
log("ERR", "未找到登录表单")
dump_page_state(page, "找不到表单")
return False
log("INFO", "" * 50)
log("INFO", " 请在浏览器中手动输入账号并登录")
log("INFO", " 程序将在检测到登录成功后自动继续...")
log("INFO", "" * 50)
return _wait_for_login(page, timeout=120) # 手动模式等待 2 分钟
# ── 等待登录结果(自动/手动共用) ────────────────────────────────────────────
def _wait_for_login(page: ChromiumPage, timeout: int = 15) -> bool:
"""轮询检测登录结果:表单消失 → 成功error 提示 → 失败。"""
for elapsed in range(1, timeout + 1):
time.sleep(1)
has_form = bool(page.ele("@placeholder=密码", timeout=0))
err_el = page.ele(".el-message--error", timeout=0)
err_text = err_el.text.strip() if err_el else ""
if err_text:
log("ERR", f"登录失败: {err_text}")
dump_page_state(page)
return False
if not has_form:
log("OK", f"登录成功({elapsed}s{page.url}")
return True
if elapsed % 10 == 0: # 每 10 秒提示一次进度(手动模式有用)
log("INFO", f" 等待登录中... ({elapsed}s)")
log("WARN", f"超过 {timeout}s 未检测到登录成功")
dump_page_state(page, "登录超时")
return False
# ── 单独运行时的入口 ──────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
manual = "--manual" in sys.argv
page = get_page()
try:
ok = login_manual(page) if manual else login(page)
if ok:
log("OK", "登录完成,按 Enter 关闭浏览器")
input()
else:
log("ERR", "登录失败")
time.sleep(3)
except KeyboardInterrupt:
log("INFO", "用户中断")
finally:
page.quit()
log("INFO", "浏览器已关闭")