""" 收货明细报表 - 智能增量同步脚本 目标: 1. 自动连接本地 SQLite 数据库查询当前存量。 2. 进入 ERP 系统截获第一页 API,提取系统总条数。 3. 精准计算需要跳转的起始页码,并在前端页面自动完成跳转。 4. 仅提取新增页面的数据,内存去重后插入 SQLite,绝不重复抓取历史数据。 """ import sys import json import time import math import random import sqlite3 from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from login import get_page, log from config import DB_PATH HOME_URL = "https://yunmes.tftykj.cn/" API_TARGET = "ReceiptDetailsCheck_SearchList_Proxy" def get_local_count(conn): """获取本地数据库已有的总记录数""" cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM receipt_details") return cursor.fetchone()[0] def item_exists(cursor, item): """判断某条明细是否已在数据库中存在(基于采购单号+行号+物料代码组合判断)""" po_code = item.get("purchaseOrderCode") row_no = item.get("rowsNum") mat_code = item.get("materialCode") cursor.execute(''' SELECT 1 FROM receipt_details WHERE purchase_order_code = ? AND row_no = ? AND material_code = ? ''', (po_code, row_no, mat_code)) return cursor.fetchone() is not None def fetch_receipt_details_incremental(): log("INFO", "=== 🚀 启动收货明细报表 - 智能增量同步 ===") if not DB_PATH.exists(): log("ERR", f"找不到数据库文件: {DB_PATH},请先执行全量导入!") return conn = sqlite3.connect(DB_PATH) local_count = get_local_count(conn) log("INFO", f"📦 本地数据库当前总计: {local_count} 条数据") page = get_page(port=9222) try: log("INFO", f"正在回到主页起点: {HOME_URL}") page.get(HOME_URL) page.wait.load_start() time.sleep(2) menus = [ ("第一层: 业务统计报表", 'xpath://*[@id="app"]/div/div[1]/div[1]/div[2]/div/div[1]/div/div[10]/div/p'), ("第二层: 采购业务报表", 'xpath:/html/body/div[7]/div/div[1]/div/div[4]/div/p'), ("第三层: 收货明细报表", 'xpath:/html/body/div[8]/div/div[1]/div/div[4]/div/p') ] log("INFO", "模拟点击左侧导航菜单...") for name, xpath in menus: ele = page.ele(xpath, timeout=5) if ele: try: ele.click() except: page.run_js("arguments[0].click();", ele) time.sleep(1.5) else: log("ERR", f"找不到菜单元素: {name}") return log("OK", "✅ 成功点开收货明细报表界面!") # 隐藏菜单 blank_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[1]/div[2]/div[2]/div/div[1]/div' blank_ele = page.ele(blank_xpath, timeout=3) if blank_ele: try: blank_ele.click() except: page.run_js("arguments[0].click();", blank_ele) time.sleep(0.5) log("INFO", f"开启底层数据拦截网: {API_TARGET}") page.listen.start(API_TARGET) packet = page.listen.wait(timeout=10) if not packet: query_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[1]/div/button[1]/span' query_btn = page.ele(query_btn_xpath, timeout=3) if query_btn: try: query_btn.click() except: page.run_js("arguments[0].click();", query_btn) packet = page.listen.wait(timeout=15) if not packet: log("ERR", "未能拦截到第一页数据,无法获取线上总条数。") return body = packet.response.body data = body if isinstance(body, (dict, list)) else json.loads(body) remote_count = 0 if isinstance(data, dict) and "result" in data: remote_count = data["result"].get("totalCount", 0) log("INFO", f"🌐 线上 ERP 系统当前总条数: {remote_count} 条") if remote_count <= local_count: log("OK", "🎉 本地数据已是最新状态,无需抓取!") return new_items_count = remote_count - local_count log("INFO", f"🔥 发现新增数据: {new_items_count} 条!准备进行增量跳页抓取...") # 每页 50 条,计算应该从哪一页开始抓 # 例如: 本地有 37584 条,37584 // 50 = 751 页是满的,所以从第 752 页开始抓 start_page = math.floor(local_count / 50) + 1 end_page = math.ceil(remote_count / 50) log("INFO", f"🎯 智能跳页计算完毕:直接跳转至第 {start_page} 页 (目标到 {end_page} 页)") # 执行跳转 if start_page > 1: jumper_input_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[2]/div/div[2]/div[1]/span[3]/div/div//input' input_ele = page.ele(jumper_input_xpath, timeout=5) if not input_ele: jumper_input_xpath = 'xpath://input[@type="number" and @aria-label="页"]' input_ele = page.ele(jumper_input_xpath, timeout=5) if input_ele: input_ele.clear() input_ele.input(str(start_page)) time.sleep(0.5) input_ele.input('\n') # 等待跳转后的数据响应 packet = page.listen.wait(timeout=15) if not packet: log("ERR", "跳转失败,未拦截到目标页的数据请求。") return log("OK", f"✅ 成功跳转至第 {start_page} 页并截获数据!") else: log("ERR", "找不到页码输入框,增量跳转失败!") return # ========================================================= # 开始处理新增页面的数据并入库 # ========================================================= current_page = start_page cursor = conn.cursor() total_inserted = 0 while current_page <= end_page: body = packet.response.body data = body if isinstance(body, (dict, list)) else json.loads(body) inserted_this_page = 0 if isinstance(data, dict) and "result" in data: items = data["result"].get("items", []) for item in items: po_code = item.get("purchaseOrderCode") row_no = item.get("rowsNum") mat_code = item.get("materialCode") # 检查是否存在,如果存在则更新数量和金额,不存在则插入 cursor.execute('SELECT id FROM receipt_details WHERE purchase_order_code = ? AND row_no = ? AND material_code = ?', (po_code, row_no, mat_code)) existing_record = cursor.fetchone() # 进货数量(件数)永远只取原始的 plannedPurchaseQuantity,不取转换后的 p_qty = item.get("plannedPurchaseQuantity") r_qty = item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity") if existing_record: cursor.execute(''' UPDATE receipt_details SET purchase_qty = ?, receive_qty = ?, receive_price = ?, total_amount = ? WHERE id = ? ''', (p_qty, r_qty, item.get("receivePrice"), item.get("receiveAmount"), existing_record[0])) # 算作更新,为了记录日志 inserted_this_page += 1 else: cursor.execute(''' INSERT INTO receipt_details ( purchase_order_code, row_no, material_code, material_name, material_specification, warehouse_code, warehouse_name, supplier_code, supplier_name, unit_name, conversion_unit, receive_price, receipt_time, purchase_qty, receive_qty, total_amount ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( po_code, row_no, mat_code, item.get("materialName"), item.get("materialSpecification"), item.get("warehouseCode"), item.get("warehouseName"), item.get("supplierCode"), item.get("supplierName"), item.get("unitName"), item.get("convertUnitName"), item.get("receivePrice"), item.get("receiptTime"), p_qty, r_qty, item.get("receiveAmount") )) inserted_this_page += 1 total_inserted += 1 conn.commit() log("OK", f"第 {current_page} 页处理完毕,成功入库 {inserted_this_page} 条新数据。") # 还有下一页则继续点击 if current_page < end_page: delay = random.uniform(1.5, 3.5) log("INFO", f"⏳ 停顿 {delay:.2f} 秒后点击下一页...") time.sleep(delay) # 同步全量脚本的优化:重试机制与兼容的类名匹配 next_btn = None for _ in range(3): next_btn = page.ele('xpath://button[contains(@class, "btn-next")]', timeout=3) if next_btn: break time.sleep(1) # 备用定位方式:直接找右箭头图标所在的按钮 if not next_btn: next_btn = page.ele('xpath://i[contains(@class, "el-icon-arrow-right")]/parent::button', timeout=3) if next_btn: try: next_btn.click() except: page.run_js("arguments[0].click();", next_btn) packet = page.listen.wait(timeout=15) if not packet: log("ERR", f"第 {current_page + 1} 页请求超时!") break else: log("ERR", "重试 3 次后仍然找不到下一页按钮!") break current_page += 1 log("OK", f"🎉 增量同步大功告成!总计入库 {total_inserted} 条全新数据!") except Exception as e: log("ERR", f"发生全局异常: {e}") finally: if 'conn' in locals() and conn: conn.close() if 'page' in locals() and page: try: page.listen.stop() except Exception: pass if __name__ == "__main__": fetch_receipt_details_incremental()