""" 收货明细报表 - 智能增量同步脚本 目标: 1. 自动连接本地 SQLite 数据库查询当前存量。 2. 进入 ERP 系统截获第一页 API,提取系统总条数。 3. 精准计算需要跳转的起始页码,并在前端页面自动完成跳转。 4. 仅提取新增页面的数据,内存去重后插入 SQLite,绝不重复抓取历史数据。 """ import sys import json import time import subprocess import math import random import sqlite3 import datetime import calendar from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from login import get_page, log from config import DB_PATH HOME_URL = "https://yunmes.tftykj.cn/" API_TARGET = "ReceiptDetailsCheckFinace_SearchList" def get_local_count(conn): """获取本地数据库已有的总记录数""" cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM receipt_details") return cursor.fetchone()[0] def item_exists(cursor, item): """判断某条明细是否已在数据库中存在(基于采购单号+行号+物料代码组合判断)""" po_code = item.get("purchaseOrderCode") row_no = item.get("rowsNum") mat_code = item.get("materialCode") cursor.execute(''' SELECT 1 FROM receipt_details WHERE purchase_order_code = ? AND row_no = ? AND material_code = ? ''', (po_code, row_no, mat_code)) return cursor.fetchone() is not None def fetch_receipt_details_incremental(): log("INFO", "=== 🚀 启动收货明细报表 - 智能增量同步 ===") if not DB_PATH.exists(): log("ERR", f"找不到数据库文件: {DB_PATH},请先执行全量导入!") return conn = sqlite3.connect(DB_PATH) local_count = get_local_count(conn) log("INFO", f"📦 本地数据库当前总计: {local_count} 条数据") page = get_page(port=9222) try: # 如果是首次打开浏览器(没有保持登录状态),先调用 login from login import login log("INFO", "检查登录状态并执行登录...") if not login(page): log("ERR", "登录 ERP 系统失败,中止抓取。") return TARGET_URL = "https://yunmes.tftykj.cn/ReceiptDetailsCheckFinace" log("INFO", f"正在直接访问目标页面: {TARGET_URL}") page.get(TARGET_URL) page.wait.load_start() time.sleep(2) # 等待数据表格区域出现 table = page.ele("xpath://table | .el-table__body", timeout=15) if table: log("OK", "✅ 成功打开财务收货明细报表界面!") else: log("WARN", "表格元素未找到,继续执行") log("INFO", f"开启底层数据拦截网: {API_TARGET}") page.listen.start(API_TARGET) # 为了能够获取当月的数据,强制设置时间为当月第一天到最后一天,并清理其他条件 now = datetime.datetime.now() first_day = datetime.date(now.year, now.month, 1).strftime('%Y-%m-%d') last_day = datetime.date(now.year, now.month, calendar.monthrange(now.year, now.month)[1]).strftime('%Y-%m-%d') log("INFO", f"正在自动设置查询时间范围: {first_day} 至 {last_day}") page.run_js(f""" try {{ var dates = document.querySelectorAll('.datebox-f, .datetimebox-f, .el-date-editor input'); if (dates.length >= 2) {{ // 这里适配 ElementUI 或 EasyUI 的日期输入框 dates[0].value = '{first_day}'; dates[1].value = '{last_day}'; // 触发 input 和 change 事件让 Vue/React 感知到值的改变 dates[0].dispatchEvent(new Event('input', {{ bubbles: true }})); dates[0].dispatchEvent(new Event('change', {{ bubbles: true }})); dates[1].dispatchEvent(new Event('input', {{ bubbles: true }})); dates[1].dispatchEvent(new Event('change', {{ bubbles: true }})); }} }} catch(e) {{ console.log(e); }} """) time.sleep(1) # 寻找并点击页面上的【查询】按钮,不再盲目等待刷新 log("INFO", "尝试寻找并点击页面上的【查询】按钮...") # 使用 DrissionPage 内置选择器尝试寻找 query_btn = page.ele('text=查询', timeout=3) if not query_btn: query_btn = page.ele('xpath://button[contains(., "查询")]', timeout=3) if query_btn: try: query_btn.click() except: page.run_js("arguments[0].click();", query_btn) else: log("WARN", "常规选择器找不到查询按钮,尝试使用全局 JS 强行寻找...") # 暴力兜底:通过 JS 遍历所有按钮和链接点击 clicked = page.run_js(""" var btns = document.querySelectorAll('button, a, .l-btn, .el-button'); for(var i=0; i 1: delay = random.uniform(1.5, 3.5) log("INFO", f"⏳ 停顿 {delay:.2f} 秒后准备获取第 {current_page} 页...") time.sleep(delay) next_btn = None for _ in range(3): # 优先使用 pagination-next,如果不行再尝试其他类名 next_btn = page.ele('xpath://*[contains(@class, "pagination-next")]', timeout=3) if not next_btn: next_btn = page.ele('xpath://button[contains(@class, "btn-next")]', timeout=3) if next_btn: break time.sleep(1) if not next_btn: next_btn = page.ele('xpath://i[contains(@class, "el-icon-arrow-right")]/parent::button', timeout=3) if not next_btn: log("ERR", "找不到下一页按钮,可能页面异常或已到底部,停止抓取。") break # 检查按钮是否被禁用 class_str = str(next_btn.attr("class")) aria_disabled = next_btn.attr("aria-disabled") is_disabled_attr = next_btn.attr("disabled") is not None # 如果这个按钮外部包着一个
  • 或者是其他容器,也要检查它的父元素是不是 disabled parent_class_str = "" try: parent_ele = next_btn.parent() parent_class_str = str(parent_ele.attr("class")) except: pass if "disabled" in class_str or "disabled" in parent_class_str or is_disabled_attr or aria_disabled == "true": log("OK", "🏁 下一页按钮已被禁用,说明已经到达最后一页!") break try: # 尝试 JS 点击(翻页按钮有时会被其他浮层遮挡,JS 点击最稳妥) page.run_js("arguments[0].click();", next_btn) except Exception as e: log("ERR", f"JS 点击下一页失败: {e},尝试普通点击...") next_btn.click() packet = page.listen.wait(timeout=15) if not packet: log("ERR", f"第 {current_page} 页请求超时或未触发,中止抓取。") break body = packet.response.body data = body if isinstance(body, (dict, list)) else json.loads(body) inserted_this_page = 0 updated_this_page = 0 if isinstance(data, dict) and "result" in data: items = data["result"].get("items", []) for item in items: po_code = item.get("purchaseOrderCode") row_no = item.get("rowsNum") mat_code = item.get("materialCode") # 检查是否存在,根据采购订单号和物料代码进行双条件比对 cursor.execute('SELECT id FROM receipt_details WHERE purchase_order_code = ? AND material_code = ?', (po_code, mat_code)) existing_record = cursor.fetchone() p_qty = item.get("plannedPurchaseQuantity") r_qty = item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity") if existing_record: cursor.execute(''' UPDATE receipt_details SET purchase_qty = ?, receive_qty = ?, receive_price = ?, total_amount = ? WHERE id = ? ''', (p_qty, r_qty, item.get("receivePrice"), item.get("receiveAmount"), existing_record[0])) updated_this_page += 1 total_updated += 1 else: cursor.execute(''' INSERT INTO receipt_details ( purchase_order_code, row_no, material_code, material_name, material_specification, warehouse_code, warehouse_name, supplier_code, supplier_name, unit_name, conversion_unit, receive_price, receipt_time, purchase_qty, receive_qty, total_amount ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( po_code, row_no, mat_code, item.get("materialName"), item.get("materialSpecification"), item.get("warehouseCode"), item.get("warehouseName"), item.get("supplierCode"), item.get("supplierName"), item.get("unitName"), item.get("convertUnitName"), item.get("receivePrice"), item.get("receiptTime"), p_qty, r_qty, item.get("receiveAmount") )) inserted_this_page += 1 total_inserted += 1 conn.commit() log("OK", f"第 {current_page} 页处理完毕,新增 {inserted_this_page} 条,更新 {updated_this_page} 条。") else: log("ERR", f"第 {current_page} 页数据结构异常。") break current_page += 1 log("OK", f"🎉 增量抓取全部结束!总计新增 {total_inserted} 条,更新 {total_updated} 条。") except Exception as e: log("ERR", f"发生全局异常: {e}") finally: if 'conn' in locals() and conn: conn.close() if 'page' in locals() and page: try: page.listen.stop() except Exception: pass try: # 必须强制退出浏览器,释放内存,防止产生僵尸进程导致 404 page.quit() except Exception: pass if __name__ == "__main__": fetch_receipt_details_incremental()