""" 收货明细报表 - 全量分页抓取 (精简字段模式) 目标: 模拟点击菜单,过滤 11 个核心字段,并循环点击下一页,直到所有数据抓取完毕。 """ import sys import json import time import random from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from login import get_page, log from config import OUTPUT_DIR HOME_URL = "https://yunmes.tftykj.cn/" API_TARGET = "ReceiptDetailsCheck_SearchList_Proxy" SAVE_PATH = OUTPUT_DIR / "receipt_details_full_clean.json" def fetch_receipt_details_full(): log("INFO", "=== 🚚 启动收货明细报表全量抓取 (精简字段模式) ===") page = get_page(port=9222) all_clean_items = [] try: log("INFO", f"正在回到主页起点: {HOME_URL}") page.get(HOME_URL) page.wait.load_start() time.sleep(2) menus = [ ("第一层: 业务统计报表", 'xpath://*[@id="app"]/div/div[1]/div[1]/div[2]/div/div[1]/div/div[10]/div/p'), ("第二层: 采购业务报表", 'xpath:/html/body/div[7]/div/div[1]/div/div[4]/div/p'), ("第三层: 收货明细报表", 'xpath:/html/body/div[8]/div/div[1]/div/div[4]/div/p') ] log("INFO", "开始模拟人工点击左侧导航菜单...") for name, xpath in menus: ele = page.ele(xpath, timeout=5) if ele: try: ele.click() except: page.run_js("arguments[0].click();", ele) time.sleep(1.5) else: log("ERR", f"找不到菜单元素: {name}") return log("OK", "✅ 成功点开收货明细报表界面!") # 点击空白处隐藏菜单 blank_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[1]/div[2]/div[2]/div/div[1]/div' blank_ele = page.ele(blank_xpath, timeout=3) if blank_ele: try: blank_ele.click() except: page.run_js("arguments[0].click();", blank_ele) time.sleep(0.5) log("INFO", f"开启底层数据拦截网: {API_TARGET}") page.listen.start(API_TARGET) packet = page.listen.wait(timeout=10) if not packet: log("INFO", "尝试寻找并点击页面上的【查询】按钮...") query_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[1]/div/button[1]/span' query_btn = page.ele(query_btn_xpath, timeout=3) if query_btn: try: query_btn.click() except: page.run_js("arguments[0].click();", query_btn) packet = page.listen.wait(timeout=15) if not packet: log("ERR", "未能拦截到第一页数据,可能网络超时或查询未触发。") page.listen.stop() return # ========================================================= # 第一页数据处理 # ========================================================= log("OK", f"🎉 成功拦截到第一页数据!HTTP: {packet.response.status}") body = packet.response.body data = body if isinstance(body, (dict, list)) else json.loads(body) total_count = 0 if isinstance(data, dict) and "result" in data: total_count = data["result"].get("totalCount", 0) items = data["result"].get("items", []) for item in items: all_clean_items.append({ "采购订单号": item.get("purchaseOrderCode"), "行号": item.get("rowsNum"), "物料代码": item.get("materialCode"), "物料名称": item.get("materialName"), "物料规格": item.get("materialSpecification"), "仓库代码": item.get("warehouseCode"), "仓库名称": item.get("warehouseName"), "供应商代码": item.get("supplierCode"), "供应商名称": item.get("supplierName"), "单位名称": item.get("unitName"), "转换单位": item.get("convertUnitName"), "收货单价": item.get("receivePrice"), "收货时间": item.get("receiptTime"), "进货数量": item.get("convertPlannedPurchaseQuantity") if item.get("convertPlannedPurchaseQuantity") is not None else item.get("plannedPurchaseQuantity"), "收货数量": item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity"), "收货总金额": item.get("receiveAmount") }) log("OK", f"第一页清洗完成,提取了 {len(items)} 条数据。后端报告总条数: {total_count}") page_num = 1 # ========================================================= # 循环翻页抓取 # ========================================================= next_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[2]/div/div[2]/div[1]/button[2]' while True: # 引入“类人”随机延迟(2.5 秒到 5.5 秒之间随机) delay = random.uniform(2.5, 5.5) log("INFO", f"⏳ 模拟真人停顿 {delay:.2f} 秒后,准备点击下一页...") time.sleep(delay) # 偶尔的“长休息”(模拟用户看累了或者喝口水),每抓 50 页额外休息 10-20 秒 if page_num > 1 and page_num % 50 == 0: long_delay = random.uniform(10.0, 20.0) log("INFO", f"☕️ 已经连续高强度翻了 {page_num} 页,触发风控规避机制,假装喝水休息 {long_delay:.2f} 秒...") time.sleep(long_delay) next_btn = page.ele(next_btn_xpath, timeout=5) if not next_btn: log("ERR", "找不到下一页按钮,翻页中止。") break # 检查按钮是否被禁用 class_str = str(next_btn.attr("class")) aria_disabled = next_btn.attr("aria-disabled") is_disabled_attr = next_btn.attr("disabled") is not None if "disabled" in class_str or is_disabled_attr or aria_disabled == "true": log("OK", "🏁 下一页按钮已被禁用,说明已经到达最后一页!") break page_num += 1 log("INFO", f"正在点击【下一页】抓取第 {page_num} 页...") try: next_btn.click() except Exception as e: log("ERR", f"普通点击失败: {e},尝试 JS 点击...") page.run_js("arguments[0].click();", next_btn) # 等待新一页的 API 响应 packet = page.listen.wait(timeout=15) if not packet: log("ERR", f"第 {page_num} 页请求超时或未触发,中止抓取。") break body = packet.response.body data = body if isinstance(body, (dict, list)) else json.loads(body) if isinstance(data, dict) and "result" in data: items = data["result"].get("items", []) if not items: log("WARN", f"第 {page_num} 页返回了空列表,可能已无数据。") break for item in items: all_clean_items.append({ "采购订单号": item.get("purchaseOrderCode"), "行号": item.get("rowsNum"), "物料代码": item.get("materialCode"), "物料名称": item.get("materialName"), "物料规格": item.get("materialSpecification"), "仓库代码": item.get("warehouseCode"), "仓库名称": item.get("warehouseName"), "供应商代码": item.get("supplierCode"), "供应商名称": item.get("supplierName"), "单位名称": item.get("unitName"), "转换单位": item.get("convertUnitName"), "收货单价": item.get("receivePrice"), "收货时间": item.get("receiptTime"), "进货数量": item.get("plannedPurchaseQuantity"), "收货数量": item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity"), "收货总金额": item.get("receiveAmount") }) log("OK", f"第 {page_num} 页清洗完成,累计提取 {len(all_clean_items)} 条数据。") # 每 10 页自动保存一次,防止意外崩溃导致数据丢失 if page_num % 10 == 0: with open(SAVE_PATH, "w", encoding="utf-8") as f: json.dump(all_clean_items, f, ensure_ascii=False, indent=2) log("INFO", f"💾 自动存档: 已保存 {len(all_clean_items)} 条记录至本地。") else: log("ERR", f"第 {page_num} 页数据结构异常,中止。") break page.listen.stop() # ========================================================= # 最终保存 # ========================================================= if all_clean_items: with open(SAVE_PATH, "w", encoding="utf-8") as f: json.dump(all_clean_items, f, ensure_ascii=False, indent=2) log("OK", f"🎉 全部抓取完成!总计成功提取 {len(all_clean_items)} 条数据。") log("OK", f"数据已保存至: {SAVE_PATH}") except Exception as e: log("ERR", f"发生全局异常: {e}") # 异常时尝试抢救数据 if all_clean_items: rescue_path = OUTPUT_DIR / "receipt_details_RESCUE.json" with open(rescue_path, "w", encoding="utf-8") as f: json.dump(all_clean_items, f, ensure_ascii=False, indent=2) log("INFO", f"🆘 触发异常保存,抢救了 {len(all_clean_items)} 条数据。") if __name__ == "__main__": fetch_receipt_details_full()