""" 收货明细报表 - 全量分页抓取 (精简字段模式) 目标: 模拟点击菜单,过滤 11 个核心字段,并循环点击下一页,直到所有数据抓取完毕。 """ import sys import json import time import random from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from login import get_page, log from config import OUTPUT_DIR HOME_URL = "https://yunmes.tftykj.cn/" API_TARGET = "ReceiptDetailsCheck_SearchList_Proxy" SAVE_PATH = OUTPUT_DIR / "receipt_details_full_clean.json" def fetch_receipt_details_full(): log("INFO", "=== 🚚 启动收货明细报表全量抓取 (精简字段模式) ===") page = get_page(port=9222) # 尝试加载已有的存档,实现真正的断点累加 all_clean_items = [] if SAVE_PATH.exists(): try: with open(SAVE_PATH, "r", encoding="utf-8") as f: all_clean_items = json.load(f) log("INFO", f"📦 已加载本地历史存档,包含 {len(all_clean_items)} 条数据。") except Exception as e: log("WARN", f"加载本地存档失败: {e},将从空列表开始。") all_clean_items = [] try: log("INFO", f"正在回到主页起点: {HOME_URL}") page.get(HOME_URL) page.wait.load_start() time.sleep(2) menus = [ ("第一层: 业务统计报表", 'xpath://*[@id="app"]/div/div[1]/div[1]/div[2]/div/div[1]/div/div[10]/div/p'), ("第二层: 财务业务报表", 'text:财务业务报表'), ("第三层: 财务收货明细报表", 'text:财务收货明细报表') ] log("INFO", "开始模拟人工点击左侧导航菜单...") for name, xpath in menus: ele = page.ele(xpath, timeout=5) if ele: try: ele.click() except: page.run_js("arguments[0].click();", ele) time.sleep(1.5) else: log("ERR", f"找不到菜单元素: {name}") return log("OK", "✅ 成功点开收货明细报表界面!") # 点击空白处隐藏菜单 blank_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[1]/div[2]/div[2]/div/div[1]/div' blank_ele = page.ele(blank_xpath, timeout=3) if blank_ele: try: blank_ele.click() except: page.run_js("arguments[0].click();", blank_ele) time.sleep(0.5) log("INFO", f"开启底层数据拦截网: {API_TARGET}") page.listen.start(API_TARGET) packet = page.listen.wait(timeout=10) if not packet: log("INFO", "尝试寻找并点击页面上的【查询】按钮...") query_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[1]/div/button[1]/span' query_btn = page.ele(query_btn_xpath, timeout=3) if query_btn: try: query_btn.click() except: page.run_js("arguments[0].click();", query_btn) packet = page.listen.wait(timeout=15) if not packet: log("ERR", "未能拦截到第一页数据,可能网络超时或查询未触发。") page.listen.stop() return # ========================================================= # 第一页数据处理 (如果触发断点,则忽略第一页数据) # ========================================================= log("OK", f"🎉 成功拦截到第一页数据!HTTP: {packet.response.status}") body = packet.response.body data = body if isinstance(body, (dict, list)) else json.loads(body) # 设定开始抓取的页码,1表示从头开始抓全量数据 target_resume_page = 690 total_count = 0 if isinstance(data, dict) and "result" in data: total_count = data["result"].get("totalCount", 0) items = data["result"].get("items", []) # 只有当不是断点续传(即从第1页开始)时,才把第一页的数据加入列表 if target_resume_page <= 1: for item in items: all_clean_items.append({ "采购订单号": item.get("purchaseOrderCode"), "行号": item.get("rowsNum"), "物料代码": item.get("materialCode"), "物料名称": item.get("materialName"), "物料规格": item.get("materialSpecification"), "仓库代码": item.get("warehouseCode"), "仓库名称": item.get("warehouseName"), "供应商代码": item.get("supplierCode"), "供应商名称": item.get("supplierName"), "单位名称": item.get("unitName"), "转换单位": item.get("convertUnitName"), "收货单价": item.get("receivePrice"), "收货时间": item.get("receiptTime"), "进货数量": item.get("plannedPurchaseQuantity"), "收货数量": item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity"), "收货总金额": item.get("receiveAmount") }) log("OK", f"第一页清洗完成,提取了 {len(items)} 条数据。后端报告总条数: {total_count}") else: log("INFO", f"触发断点续传,跳过第一页的数据保存。后端报告总条数: {total_count}") page_num = 1 # ========================================================= # 断点续传逻辑 (由于刚才中断在 711 页,我们需要跳到 712 页继续) # ========================================================= if target_resume_page > 1: log("INFO", f"🚀 触发断点续传机制!准备直接跳转到第 {target_resume_page} 页...") # 尝试找页码输入框 jumper_input_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[2]/div/div[2]/div[1]/span[3]/div/div//input' input_ele = page.ele(jumper_input_xpath, timeout=5) if not input_ele: jumper_input_xpath = 'xpath://input[@type="number" and @aria-label="页"]' input_ele = page.ele(jumper_input_xpath, timeout=5) if input_ele: input_ele.clear() input_ele.input(str(target_resume_page)) time.sleep(0.5) input_ele.input('\n') packet = page.listen.wait(timeout=15) if not packet: log("ERR", "断点跳转失败,未拦截到目标页的数据请求。") return log("OK", f"✅ 成功跳转至第 {target_resume_page} 页并截获数据!") page_num = target_resume_page # 读取并解析第 191 页的数据 body = packet.response.body data = body if isinstance(body, (dict, list)) else json.loads(body) if isinstance(data, dict) and "result" in data: items = data["result"].get("items", []) for item in items: all_clean_items.append({ "采购订单号": item.get("purchaseOrderCode"), "行号": item.get("rowsNum"), "物料代码": item.get("materialCode"), "物料名称": item.get("materialName"), "物料规格": item.get("materialSpecification"), "仓库代码": item.get("warehouseCode"), "仓库名称": item.get("warehouseName"), "供应商代码": item.get("supplierCode"), "供应商名称": item.get("supplierName"), "单位名称": item.get("unitName"), "转换单位": item.get("convertUnitName"), "收货单价": item.get("receivePrice"), "收货时间": item.get("receiptTime"), "进货数量": item.get("plannedPurchaseQuantity"), "收货数量": item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity"), "收货总金额": item.get("receiveAmount") }) log("OK", f"第 {page_num} 页清洗完成,累计提取 {len(all_clean_items)} 条数据。") else: log("ERR", "找不到页码输入框,断点跳转失败,将从第 1 页继续!") # ========================================================= # 循环翻页抓取 # ========================================================= while True: # 引入“类人”随机延迟(2.5 秒到 5.5 秒之间随机) delay = random.uniform(2.5, 5.5) log("INFO", f"⏳ 模拟真人停顿 {delay:.2f} 秒后,准备点击下一页...") time.sleep(delay) # 偶尔的“长休息”(模拟用户看累了或者喝口水),每抓 50 页额外休息 10-20 秒 if page_num > 1 and page_num % 50 == 0: long_delay = random.uniform(10.0, 20.0) log("INFO", f"☕️ 已经连续高强度翻了 {page_num} 页,触发风控规避机制,假装喝水休息 {long_delay:.2f} 秒...") time.sleep(long_delay) # 兼容多种 ElementUI 翻页按钮的特征 # 为了防止由于网络延迟导致的 DOM 元素短暂消失,我们加入重试机制 next_btn = None for _ in range(3): next_btn = page.ele('xpath://button[contains(@class, "btn-next")]', timeout=3) if next_btn: break time.sleep(1) # 【修复】当跳页页数大于 400 页时,某些页面的 ElementUI 分页组件会为了节省 DOM 而卸载 next_btn # 或者被包裹在隐藏容器里。如果在页面底部直接寻找带有 "btn-next" 且不包含 disabled 的按钮 if not next_btn: # 尝试备用定位方式:直接找右箭头图标所在的按钮 next_btn = page.ele('xpath://i[contains(@class, "el-icon-arrow-right")]/parent::button', timeout=3) if not next_btn: log("ERR", "重试 3 次后仍然找不到下一页按钮,可能是页面崩溃或会话超时,尝试强制刷新页面...") page.refresh() page.wait.load_start() time.sleep(5) # 刷新后尝试重新找一次 next_btn = page.ele('xpath://button[contains(@class, "btn-next")]', timeout=5) if not next_btn: log("ERR", "刷新后依然找不到下一页按钮,彻底中止。") break # 检查按钮是否被禁用 class_str = str(next_btn.attr("class")) aria_disabled = next_btn.attr("aria-disabled") is_disabled_attr = next_btn.attr("disabled") is not None if "disabled" in class_str or is_disabled_attr or aria_disabled == "true": log("OK", "🏁 下一页按钮已被禁用,说明已经到达最后一页!") break page_num += 1 log("INFO", f"正在点击【下一页】抓取第 {page_num} 页...") try: next_btn.click() except Exception as e: log("ERR", f"普通点击失败: {e},尝试 JS 点击...") page.run_js("arguments[0].click();", next_btn) # 等待新一页的 API 响应 packet = page.listen.wait(timeout=15) if not packet: log("ERR", f"第 {page_num} 页请求超时或未触发,中止抓取。") break body = packet.response.body data = body if isinstance(body, (dict, list)) else json.loads(body) if isinstance(data, dict) and "result" in data: items = data["result"].get("items", []) if not items: log("WARN", f"第 {page_num} 页返回了空列表,可能已无数据。") break for item in items: all_clean_items.append({ "采购订单号": item.get("purchaseOrderCode"), "行号": item.get("rowsNum"), "物料代码": item.get("materialCode"), "物料名称": item.get("materialName"), "物料规格": item.get("materialSpecification"), "仓库代码": item.get("warehouseCode"), "仓库名称": item.get("warehouseName"), "供应商代码": item.get("supplierCode"), "供应商名称": item.get("supplierName"), "单位名称": item.get("unitName"), "转换单位": item.get("convertUnitName"), "收货单价": item.get("receivePrice"), "收货时间": item.get("receiptTime"), "进货数量": item.get("plannedPurchaseQuantity"), "收货数量": item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity"), "收货总金额": item.get("receiveAmount") }) log("OK", f"第 {page_num} 页清洗完成,累计提取 {len(all_clean_items)} 条数据。") # 每 10 页自动保存一次,防止意外崩溃导致数据丢失 if page_num % 10 == 0: with open(SAVE_PATH, "w", encoding="utf-8") as f: json.dump(all_clean_items, f, ensure_ascii=False, indent=2) log("INFO", f"💾 自动存档: 已保存 {len(all_clean_items)} 条记录至本地。") else: log("ERR", f"第 {page_num} 页数据结构异常,中止。") break page.listen.stop() # ========================================================= # 最终保存 # ========================================================= if all_clean_items: with open(SAVE_PATH, "w", encoding="utf-8") as f: json.dump(all_clean_items, f, ensure_ascii=False, indent=2) log("OK", f"🎉 全部抓取完成!总计成功提取 {len(all_clean_items)} 条数据。") log("OK", f"数据已保存至: {SAVE_PATH}") except Exception as e: log("ERR", f"发生全局异常: {e}") # 异常时尝试抢救数据 if all_clean_items: rescue_path = OUTPUT_DIR / "receipt_details_RESCUE.json" with open(rescue_path, "w", encoding="utf-8") as f: json.dump(all_clean_items, f, ensure_ascii=False, indent=2) log("INFO", f"🆘 触发异常保存,抢救了 {len(all_clean_items)} 条数据。") finally: # 无论脚本正常结束还是异常退出,都强制停止监听,防止成为僵尸爬虫 try: page.listen.stop() log("INFO", "🛑 已释放浏览器监听资源。") except: pass if __name__ == "__main__": fetch_receipt_details_full()