""" ERP 登录模块 - DrissionPage """ import os import sys import time import shutil import json import datetime import subprocess import urllib.request from pathlib import Path from dotenv import load_dotenv from DrissionPage import ChromiumPage, ChromiumOptions import DrissionPage._base.driver as dp_driver_module from DrissionPage._base.chromium import Chromium, handle_options from DrissionPage._base.driver import BrowserDriver from DrissionPage._pages.chromium_tab import ChromiumTab from websocket import create_connection as raw_ws_create_connection, WebSocketBadStatusException # ── 加载 .env ───────────────────────────────────────────────────────────────── load_dotenv(Path(__file__).parent / ".env") # 强制 Python 的 websocket 客户端忽略本地代理,防止出现 Handshake status 404 Not Found os.environ["NO_PROXY"] = "localhost,127.0.0.1,::1" os.environ["no_proxy"] = "localhost,127.0.0.1,::1" ERP_URL = os.getenv("ERP_URL", "https://yunmes.tftykj.cn/#") ERP_TENANT = os.getenv("ERP_TENANT", "") ERP_USERNAME = os.getenv("ERP_USERNAME", "") ERP_PASSWORD = os.getenv("ERP_PASSWORD", "") def is_docker_env() -> bool: """判断当前是否运行在 Docker 容器中。""" return os.path.exists("/.dockerenv") def is_linux_env() -> bool: """判断当前是否运行在 Linux 环境。""" return sys.platform.startswith("linux") def patch_drission_ws_handshake() -> None: """ 为 Linux 环境下的 DrissionPage WebSocket 握手增加兼容性降级。 """ if getattr(dp_driver_module, "_DTSK_WS_PATCHED", False): return def resilient_create_connection(address, **kwargs): log("INFO", f"[DEBUG-WS] websocket-client trying to connect to: {address}") base_kwargs = dict(kwargs) # 强制禁用代理,防止请求被容器内的网络规则重定向 no_proxy_hosts = ["127.0.0.1", "localhost", "::1"] base_kwargs["http_no_proxy"] = no_proxy_hosts base_kwargs["http_proxy_host"] = None base_kwargs["http_proxy_port"] = None # Chrome 149 严格校验,如果 suppress_origin=True 且没有合法 Host/Origin,会被 404 # 我们这里准备几个干净的 base_kwargs 变体 clean_kwargs = dict(base_kwargs) clean_kwargs.pop("suppress_origin", None) candidate_kwargs = [ # 策略1:最干净的连接,强制关闭 suppress_origin { **clean_kwargs, "suppress_origin": False }, # 策略2:带上明确的 Origin { **clean_kwargs, "suppress_origin": False, "header": ["Origin: http://127.0.0.1"] }, # 策略3:尊重原始调用的参数(DrissionPage 默认行为) { **base_kwargs } ] last_err = None for i, candidate in enumerate(candidate_kwargs, 1): try: # 强制使用 127.0.0.1,因为在 Docker 内 localhost 可能解析异常 target_url = address.replace("localhost", "127.0.0.1") log("INFO", f"[DEBUG-WS] 尝试策略 {i} 连接 {target_url} (suppress_origin={candidate.get('suppress_origin')}, header={candidate.get('header')})") return raw_ws_create_connection(target_url, **candidate) except WebSocketBadStatusException as ws_err: log("WARN", f"[DEBUG-WS] 策略 {i} 失败 (WebSocketBadStatusException): {ws_err}") last_err = ws_err except Exception as other_err: log("WARN", f"[DEBUG-WS] 策略 {i} 失败 (Exception): {other_err}") last_err = other_err break raise last_err if is_linux_env(): # 如果是老版的 create_connection,替换它 dp_driver_module.create_connection = resilient_create_connection # 覆盖 websocket-client 原生的 create_connection 避免某些模块直接引用 import websocket websocket.create_connection = resilient_create_connection # 还有可能是通过导入的方式,比如 from websocket import create_connection # 我们需要在 driver.py 中覆盖它 import DrissionPage._base.driver as driver_mod driver_mod.create_connection = resilient_create_connection dp_driver_module._DTSK_WS_PATCHED = True patch_drission_ws_handshake() def get_docker_tmp_root() -> Path: """ 指定 DrissionPage 在 Docker 中的临时根目录。 auto_port() 会在该目录下自动创建独立端口和用户目录。 """ tmp_root = Path(os.getenv("DRISSION_TMP_ROOT", "/tmp")) / "DrissionPage" tmp_root.mkdir(parents=True, exist_ok=True) return tmp_root def resolve_browser_path() -> str: """ 统一解析浏览器路径。 Linux 生产环境优先使用 Google Chrome Stable,避免 Chromium 与 DrissionPage 在 DevTools WebSocket 握手阶段出现兼容性问题。 """ env_candidates = [ os.getenv("DRISSION_BROWSER_PATH", "").strip(), os.getenv("CHROME_BIN", "").strip(), os.getenv("BROWSER_PATH", "").strip(), ] for candidate in env_candidates: if candidate and os.path.exists(candidate): return candidate if is_linux_env(): browser_candidates = [ "/usr/bin/google-chrome", "/usr/bin/google-chrome-stable", "/usr/bin/chromium", "/usr/bin/chromium-browser", ] else: browser_candidates = [ "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", shutil.which("google-chrome") or "", shutil.which("chromium") or "", shutil.which("chromium-browser") or "", ] for candidate in browser_candidates: if candidate and os.path.exists(candidate): return candidate return "" def cleanup_debug_port(address: str) -> None: """按实际 DevTools 端口清理僵尸浏览器进程及残留的锁文件。""" if not address or ":" not in address: return debug_port = address.rsplit(":", 1)[-1] # 1. 杀掉占用端口的僵尸进程 subprocess.run( f"lsof -ti tcp:{debug_port} | xargs -r kill -9", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # 2. 如果在 Linux 环境,彻底清理残留的 DrissionPage 锁文件,防止 "Failed to create SingletonLock" if is_linux_env(): subprocess.run( "rm -rf /tmp/DrissionPage* /tmp/.org.chromium.*", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def clear_drission_singletons() -> None: """ 清理 DrissionPage 的全局单例缓存。 失败后的半初始化对象会残留在这些缓存中,后续重试会错误复用脏对象。 """ Chromium._BROWSERS.clear() ChromiumPage._PAGES.clear() ChromiumTab._TABS.clear() BrowserDriver.BROWSERS.clear() def probe_devtools_endpoints(address: str, log_output: bool = True) -> dict: """探测 DevTools HTTP 端点,并返回解析后的版本信息与标签页信息。""" result = {"version": None, "list": None} if not address: return result endpoint_map = {"version": "json/version", "list": "json/list"} for key, endpoint in endpoint_map.items(): url = f"http://{address}/{endpoint}" try: with urllib.request.urlopen(url, timeout=2) as resp: body = resp.read().decode("utf-8", errors="replace") if log_output: log("WARN", f"[DEBUG] DevTools 探测 {url} -> HTTP {resp.status}") log("WARN", f"[DEBUG] DevTools 响应体 {endpoint}: {body[:1000]}") result[key] = json.loads(body) except Exception as probe_err: if log_output: log("WARN", f"[DEBUG] DevTools 探测失败 {url}: {probe_err}") return result def get_first_page_ws_address(devtools_payload: dict) -> str: """从 /json/list 响应中提取第一个可用 page/webview 的 ws 地址。""" tabs = devtools_payload.get("list") or [] if not isinstance(tabs, list): return "" for tab in tabs: if ( isinstance(tab, dict) and tab.get("type") in ("page", "webview") and not str(tab.get("url", "")).startswith("devtools://") and tab.get("webSocketDebuggerUrl") ): return tab["webSocketDebuggerUrl"] return "" # ── 日志 ────────────────────────────────────────────────────────────────────── def log(level: str, msg: str): icons = {"INFO": "ℹ️ ", "OK": "✅", "WARN": "⚠️ ", "ERR": "❌"} ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] print(f"[{ts}] {icons.get(level, ' ')} [{level}] {msg}", flush=True) def dump_page_state(page: ChromiumPage, label: str = ""): """仅在出错时调用,打印页面快照用于排查。""" tag = f" ({label})" if label else "" log("WARN", f"─── 页面快照{tag} ───") log("WARN", f" URL : {page.url}") log("WARN", f" 标题 : {page.title}") # 错误提示(timeout=0 不阻塞) for sel in [".el-message--error", ".error-msg"]: el = page.ele(sel, timeout=0) if el and el.text.strip(): log("WARN", f" 页面提示: {el.text.strip()}") log("WARN", "─────────────────────────────") # ── 浏览器 ──────────────────────────────────────────────────────────────────── def get_page(headless: bool = False, port: int = 9222) -> ChromiumPage: co = ChromiumOptions() is_docker = is_docker_env() browser_path = resolve_browser_path() effective_headless = headless or is_docker if is_docker: clear_drission_singletons() # 暴力清理所有残留的 Chrome 进程和 Drission 临时文件,防止 Zombie Chrome 干扰 auto_port subprocess.run("pkill -9 -f chrome || true", shell=True, stderr=subprocess.DEVNULL) subprocess.run("rm -rf /tmp/DrissionPage*", shell=True, stderr=subprocess.DEVNULL) if effective_headless: co.set_argument("--headless=new") co.set_argument("--disable-gpu") # 针对 Linux 环境下的 SingletonLock 僵尸进程导致 Chrome 静默崩溃的问题 if is_linux_env(): co.set_argument("--disable-features=ProcessSingleton") co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--no-sandbox") co.set_argument("--disable-dev-shm-usage") co.set_argument("--disable-software-rasterizer") co.set_argument("--remote-allow-origins=*") co.set_argument("--remote-debugging-address=127.0.0.1") co.set_argument("--disable-web-security") co.set_argument("--ignore-certificate-errors") co.set_argument("--proxy-server=direct://") co.set_argument("--proxy-bypass-list=*") co.set_argument("--window-size=1440,900") if browser_path: co.set_browser_path(browser_path) log("INFO", f"选用浏览器内核: {browser_path}") else: log("WARN", "未解析到明确浏览器路径,将使用 DrissionPage 默认浏览器发现逻辑。") if is_docker: tmp_root = get_docker_tmp_root() co.set_tmp_path(str(tmp_root)) co.auto_port(True) log("INFO", f"Docker Drission 临时目录: {tmp_root}") else: co.set_local_port(port) # #region debug-point A:drission-target opt = handle_options(co) log( "INFO", "[DEBUG] Chromium 连接参数: " f"address={opt.address or ''}, " f"browser_path={opt.browser_path or ''}, " f"auto_port={opt.is_auto_port}, " f"headless={opt.is_headless}, " f"user_data_path={getattr(opt, 'user_data_path', '') or ''}" ) log( "INFO", "[DEBUG] Chromium 启动参数: " + " | ".join(opt.arguments) ) # #endregion try: # local_port 可能不存在,使用 getattr 避免报错 log("INFO", f"[DEBUG] 准备实例化 ChromiumPage, address={opt.address}, local_port={getattr(opt, 'local_port', 'auto')}") page = ChromiumPage(opt) log("OK", "[DEBUG] ChromiumPage 实例化成功!") return page except Exception as e: actual_address = opt.address or f"127.0.0.1:{port}" log("WARN", f"[DEBUG] ChromiumPage 初始化失败 (第一次尝试): {e}") devtools_payload = probe_devtools_endpoints(actual_address, log_output=True) if opt.address else {"version": None, "list": None} fallback_page_ws = get_first_page_ws_address(devtools_payload) if is_linux_env() and fallback_page_ws and "Handshake status 404 Not Found" in str(e): log("WARN", f"[DEBUG] Browser WS 握手失败,尝试降级连接 Page WS: {fallback_page_ws}") try: clear_drission_singletons() fallback_co = ChromiumOptions() fallback_co.set_address(fallback_page_ws) page = ChromiumPage(fallback_co) log("OK", "[DEBUG] 已通过 Page WS 降级连接成功。") return page except Exception as ws_only_e: log("ERR", f"[DEBUG] Page WS 降级连接失败: {ws_only_e}") e = ws_only_e log("WARN", f"[DEBUG] 尝试清理并完全重启浏览器...") try: clear_drission_singletons() cleanup_debug_port(actual_address) subprocess.run("pkill -9 -f chrome || true", shell=True, stderr=subprocess.DEVNULL) time.sleep(1) # 不复用 opt,防止残留上次的 ws_address # 重新调用自己,但要注意避免无限递归,这里简单返回一个新创建的即可 # 为了简单,我们可以直接用一个新的 opt 配置 new_co = ChromiumOptions() new_co.set_argument("--headless=new") new_co.set_argument("--disable-gpu") new_co.set_argument("--no-sandbox") new_co.set_argument("--disable-dev-shm-usage") new_co.set_argument("--remote-allow-origins=*") new_co.set_argument("--remote-debugging-address=127.0.0.1") new_co.auto_port(True) if browser_path: new_co.set_browser_path(browser_path) page = ChromiumPage(new_co) log("OK", "[DEBUG] 清理后重试实例化 ChromiumPage 成功!") return page except Exception as retry_e: log("ERR", f"[DEBUG] 清理后重试依然失败 (第二次尝试): {retry_e}") e = retry_e # #region debug-point B:devtools-http-probe if not opt.address: log("WARN", "[DEBUG] DevTools 探测跳过:address 为空") # #endregion log("ERR", f"浏览器初始化失败: {e}") raise e # ── Vue 表单专用输入(JS setter + 模拟键盘) ────────────────────────────────── def set_input_value(page: ChromiumPage, ele, value: str): """ Vue 双向绑定下 clear()+input() 会留残值,必须用 JS 原生 setter 清空 再通过模拟键盘输入触发 keydown/change 事件。 """ page.run_js(""" var el = arguments[0]; Object.getOwnPropertyDescriptor( window.HTMLInputElement.prototype, 'value' ).set.call(el, ''); el.dispatchEvent(new Event('input', {bubbles: true})); """, ele) time.sleep(0.05) # 等 Vue 处理 clear 事件 ele.input(value) # ── 自动登录(使用 .env 账号) ──────────────────────────────────────────────── def login(page: ChromiumPage) -> bool: """自动填写 .env 中的账号信息并登录。""" log("INFO", f"打开登录页: {ERP_URL}") page.get(ERP_URL) # 先用较短 timeout 等登录表单;若不出现则检测是否已处于登录状态 tenant_input = page.ele("@placeholder=租户代码", timeout=5) if not tenant_input: # 没有登录表单 → 判断是否已登录(主界面无密码框) has_pwd = bool(page.ele("@placeholder=密码", timeout=0)) if not has_pwd: log("OK", f"检测到已登录状态,跳过登录 → {page.url}") return True log("ERR", "未找到登录表单,页面可能未加载") dump_page_state(page, "找不到表单") return False log("INFO", "填写登录信息...") set_input_value(page, tenant_input, ERP_TENANT) set_input_value(page, page.ele("@placeholder=账号"), ERP_USERNAME) set_input_value(page, page.ele("@placeholder=密码"), ERP_PASSWORD) time.sleep(0.2) # 等所有字段的 change/blur 事件结算 # 点击登录按钮 login_btn = page.ele( "xpath://*[@id='app']/div/div[3]/div/div[2]/div[2]/div[5]/button", timeout=5 ) if not login_btn: log("ERR", "未找到登录按钮(XPath 失效,请更新选择器)") dump_page_state(page, "找不到登录按钮") return False log("INFO", "点击登录...") login_btn.click() return _wait_for_login(page) # ── 手动登录(等待用户在浏览器操作) ───────────────────────────────────────── def login_manual(page: ChromiumPage) -> bool: """打开登录页,等待用户手动完成登录操作。""" log("INFO", f"打开登录页(手动模式): {ERP_URL}") page.get(ERP_URL) # 确认登录页已加载 tenant_input = page.ele("@placeholder=租户代码", timeout=15) if not tenant_input: log("ERR", "未找到登录表单") dump_page_state(page, "找不到表单") return False log("INFO", "═" * 50) log("INFO", " 请在浏览器中手动输入账号并登录") log("INFO", " 程序将在检测到登录成功后自动继续...") log("INFO", "═" * 50) return _wait_for_login(page, timeout=120) # 手动模式等待 2 分钟 # ── 等待登录结果(自动/手动共用) ──────────────────────────────────────────── def _wait_for_login(page: ChromiumPage, timeout: int = 15) -> bool: """轮询检测登录结果:表单消失 → 成功;error 提示 → 失败。""" for elapsed in range(1, timeout + 1): time.sleep(1) has_form = bool(page.ele("@placeholder=密码", timeout=0)) err_el = page.ele(".el-message--error", timeout=0) err_text = err_el.text.strip() if err_el else "" if err_text: log("ERR", f"登录失败: {err_text}") dump_page_state(page) return False if not has_form: log("OK", f"登录成功({elapsed}s)→ {page.url}") return True if elapsed % 10 == 0: # 每 10 秒提示一次进度(手动模式有用) log("INFO", f" 等待登录中... ({elapsed}s)") log("WARN", f"超过 {timeout}s 未检测到登录成功") dump_page_state(page, "登录超时") return False # ── 单独运行时的入口 ────────────────────────────────────────────────────────── if __name__ == "__main__": import sys manual = "--manual" in sys.argv page = get_page() try: ok = login_manual(page) if manual else login(page) if ok: log("OK", "登录完成,按 Enter 关闭浏览器") input() else: log("ERR", "登录失败") time.sleep(3) except KeyboardInterrupt: log("INFO", "用户中断") finally: page.quit() log("INFO", "浏览器已关闭")