268 lines
12 KiB
Python
268 lines
12 KiB
Python
"""
|
||
收货明细报表 - 智能增量同步脚本
|
||
目标:
|
||
1. 自动连接本地 SQLite 数据库查询当前存量。
|
||
2. 进入 ERP 系统截获第一页 API,提取系统总条数。
|
||
3. 精准计算需要跳转的起始页码,并在前端页面自动完成跳转。
|
||
4. 仅提取新增页面的数据,内存去重后插入 SQLite,绝不重复抓取历史数据。
|
||
"""
|
||
import sys
|
||
import json
|
||
import time
|
||
import subprocess
|
||
import math
|
||
import random
|
||
import sqlite3
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
from login import get_page, log
|
||
from config import DB_PATH
|
||
|
||
HOME_URL = "https://yunmes.tftykj.cn/"
|
||
API_TARGET = "ReceiptDetailsCheck_SearchList_Proxy"
|
||
|
||
def get_local_count(conn):
|
||
"""获取本地数据库已有的总记录数"""
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT COUNT(*) FROM receipt_details")
|
||
return cursor.fetchone()[0]
|
||
|
||
def item_exists(cursor, item):
|
||
"""判断某条明细是否已在数据库中存在(基于采购单号+行号+物料代码组合判断)"""
|
||
po_code = item.get("purchaseOrderCode")
|
||
row_no = item.get("rowsNum")
|
||
mat_code = item.get("materialCode")
|
||
|
||
cursor.execute('''
|
||
SELECT 1 FROM receipt_details
|
||
WHERE purchase_order_code = ? AND row_no = ? AND material_code = ?
|
||
''', (po_code, row_no, mat_code))
|
||
return cursor.fetchone() is not None
|
||
|
||
def fetch_receipt_details_incremental():
|
||
log("INFO", "=== 🚀 启动收货明细报表 - 智能增量同步 ===")
|
||
|
||
if not DB_PATH.exists():
|
||
log("ERR", f"找不到数据库文件: {DB_PATH},请先执行全量导入!")
|
||
return
|
||
|
||
conn = sqlite3.connect(DB_PATH)
|
||
local_count = get_local_count(conn)
|
||
log("INFO", f"📦 本地数据库当前总计: {local_count} 条数据")
|
||
|
||
page = get_page(port=9222)
|
||
|
||
try:
|
||
log("INFO", f"正在回到主页起点: {HOME_URL}")
|
||
page.get(HOME_URL)
|
||
page.wait.load_start()
|
||
time.sleep(2)
|
||
|
||
menus = [
|
||
("第一层: 业务统计报表", 'xpath://*[@id="app"]/div/div[1]/div[1]/div[2]/div/div[1]/div/div[10]/div/p'),
|
||
("第二层: 采购业务报表", 'xpath:/html/body/div[7]/div/div[1]/div/div[4]/div/p'),
|
||
("第三层: 收货明细报表", 'xpath:/html/body/div[8]/div/div[1]/div/div[4]/div/p')
|
||
]
|
||
|
||
log("INFO", "模拟点击左侧导航菜单...")
|
||
for name, xpath in menus:
|
||
ele = page.ele(xpath, timeout=5)
|
||
if ele:
|
||
try: ele.click()
|
||
except: page.run_js("arguments[0].click();", ele)
|
||
time.sleep(1.5)
|
||
else:
|
||
log("ERR", f"找不到菜单元素: {name}")
|
||
return
|
||
|
||
log("OK", "✅ 成功点开收货明细报表界面!")
|
||
|
||
# 隐藏菜单
|
||
blank_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[1]/div[2]/div[2]/div/div[1]/div'
|
||
blank_ele = page.ele(blank_xpath, timeout=3)
|
||
if blank_ele:
|
||
try: blank_ele.click()
|
||
except: page.run_js("arguments[0].click();", blank_ele)
|
||
time.sleep(0.5)
|
||
|
||
log("INFO", f"开启底层数据拦截网: {API_TARGET}")
|
||
page.listen.start(API_TARGET)
|
||
|
||
packet = page.listen.wait(timeout=10)
|
||
if not packet:
|
||
query_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[1]/div/button[1]/span'
|
||
query_btn = page.ele(query_btn_xpath, timeout=3)
|
||
if query_btn:
|
||
try: query_btn.click()
|
||
except: page.run_js("arguments[0].click();", query_btn)
|
||
packet = page.listen.wait(timeout=15)
|
||
|
||
if not packet:
|
||
log("ERR", "未能拦截到第一页数据,无法获取线上总条数。")
|
||
return
|
||
|
||
body = packet.response.body
|
||
data = body if isinstance(body, (dict, list)) else json.loads(body)
|
||
|
||
remote_count = 0
|
||
if isinstance(data, dict) and "result" in data:
|
||
remote_count = data["result"].get("totalCount", 0)
|
||
|
||
log("INFO", f"🌐 线上 ERP 系统当前总条数: {remote_count} 条")
|
||
|
||
if remote_count <= local_count:
|
||
log("OK", "🎉 本地数据已是最新状态,无需抓取!")
|
||
return
|
||
|
||
new_items_count = remote_count - local_count
|
||
log("INFO", f"🔥 发现新增数据: {new_items_count} 条!准备进行增量跳页抓取...")
|
||
|
||
# 每页 50 条,计算应该从哪一页开始抓
|
||
# 例如: 本地有 37584 条,37584 // 50 = 751 页是满的,所以从第 752 页开始抓
|
||
start_page = math.floor(local_count / 50) + 1
|
||
end_page = math.ceil(remote_count / 50)
|
||
|
||
log("INFO", f"🎯 智能跳页计算完毕:直接跳转至第 {start_page} 页 (目标到 {end_page} 页)")
|
||
|
||
# 执行跳转
|
||
if start_page > 1:
|
||
jumper_input_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[2]/div/div[2]/div[1]/span[3]/div/div//input'
|
||
input_ele = page.ele(jumper_input_xpath, timeout=5)
|
||
|
||
if not input_ele:
|
||
jumper_input_xpath = 'xpath://input[@type="number" and @aria-label="页"]'
|
||
input_ele = page.ele(jumper_input_xpath, timeout=5)
|
||
|
||
if input_ele:
|
||
input_ele.clear()
|
||
input_ele.input(str(start_page))
|
||
time.sleep(0.5)
|
||
input_ele.input('\n')
|
||
|
||
# 等待跳转后的数据响应
|
||
packet = page.listen.wait(timeout=15)
|
||
if not packet:
|
||
log("ERR", "跳转失败,未拦截到目标页的数据请求。")
|
||
return
|
||
log("OK", f"✅ 成功跳转至第 {start_page} 页并截获数据!")
|
||
else:
|
||
log("ERR", "找不到页码输入框,增量跳转失败!")
|
||
return
|
||
|
||
# =========================================================
|
||
# 开始处理新增页面的数据并入库
|
||
# =========================================================
|
||
current_page = start_page
|
||
cursor = conn.cursor()
|
||
total_inserted = 0
|
||
|
||
while current_page <= end_page:
|
||
body = packet.response.body
|
||
data = body if isinstance(body, (dict, list)) else json.loads(body)
|
||
|
||
inserted_this_page = 0
|
||
if isinstance(data, dict) and "result" in data:
|
||
items = data["result"].get("items", [])
|
||
|
||
for item in items:
|
||
po_code = item.get("purchaseOrderCode")
|
||
row_no = item.get("rowsNum")
|
||
mat_code = item.get("materialCode")
|
||
|
||
# 检查是否存在,如果存在则更新数量和金额,不存在则插入
|
||
cursor.execute('SELECT id FROM receipt_details WHERE purchase_order_code = ? AND row_no = ? AND material_code = ?', (po_code, row_no, mat_code))
|
||
existing_record = cursor.fetchone()
|
||
|
||
# 进货数量(件数)永远只取原始的 plannedPurchaseQuantity,不取转换后的
|
||
p_qty = item.get("plannedPurchaseQuantity")
|
||
r_qty = item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity")
|
||
|
||
if existing_record:
|
||
cursor.execute('''
|
||
UPDATE receipt_details
|
||
SET purchase_qty = ?, receive_qty = ?, receive_price = ?, total_amount = ?
|
||
WHERE id = ?
|
||
''', (p_qty, r_qty, item.get("receivePrice"), item.get("receiveAmount"), existing_record[0]))
|
||
# 算作更新,为了记录日志
|
||
inserted_this_page += 1
|
||
else:
|
||
cursor.execute('''
|
||
INSERT INTO receipt_details (
|
||
purchase_order_code, row_no, material_code, material_name,
|
||
material_specification, warehouse_code, warehouse_name,
|
||
supplier_code, supplier_name, unit_name, conversion_unit,
|
||
receive_price, receipt_time,
|
||
purchase_qty, receive_qty, total_amount
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
po_code,
|
||
row_no,
|
||
mat_code,
|
||
item.get("materialName"),
|
||
item.get("materialSpecification"),
|
||
item.get("warehouseCode"),
|
||
item.get("warehouseName"),
|
||
item.get("supplierCode"),
|
||
item.get("supplierName"),
|
||
item.get("unitName"),
|
||
item.get("convertUnitName"),
|
||
item.get("receivePrice"),
|
||
item.get("receiptTime"),
|
||
p_qty,
|
||
r_qty,
|
||
item.get("receiveAmount")
|
||
))
|
||
inserted_this_page += 1
|
||
total_inserted += 1
|
||
|
||
conn.commit()
|
||
log("OK", f"第 {current_page} 页处理完毕,成功截获 {inserted_this_page} 条数据并存入数据库。")
|
||
|
||
# 还有下一页则继续点击
|
||
if current_page < end_page:
|
||
delay = random.uniform(1.5, 3.5)
|
||
log("INFO", f"⏳ 停顿 {delay:.2f} 秒后点击下一页...")
|
||
time.sleep(delay)
|
||
|
||
# 同步全量脚本的优化:重试机制与兼容的类名匹配
|
||
next_btn = None
|
||
for _ in range(3):
|
||
next_btn = page.ele('xpath://button[contains(@class, "btn-next")]', timeout=3)
|
||
if next_btn:
|
||
break
|
||
time.sleep(1)
|
||
|
||
# 备用定位方式:直接找右箭头图标所在的按钮
|
||
if not next_btn:
|
||
next_btn = page.ele('xpath://i[contains(@class, "el-icon-arrow-right")]/parent::button', timeout=3)
|
||
|
||
if next_btn:
|
||
try: next_btn.click()
|
||
except: page.run_js("arguments[0].click();", next_btn)
|
||
|
||
packet = page.listen.wait(timeout=15)
|
||
if not packet:
|
||
log("ERR", f"第 {current_page + 1} 页请求超时!")
|
||
break
|
||
else:
|
||
log("ERR", "重试 3 次后仍然找不到下一页按钮!")
|
||
break
|
||
|
||
current_page += 1
|
||
|
||
log("OK", f"🎉 增量同步大功告成!总计向数据库执行了 {total_inserted} 次插入/更新操作!")
|
||
|
||
except Exception as e:
|
||
log("ERR", f"发生全局异常: {e}")
|
||
finally:
|
||
if 'conn' in locals() and conn:
|
||
conn.close()
|
||
if 'page' in locals() and page:
|
||
try:
|
||
page.listen.stop()
|
||
except Exception:
|
||
pass
|
||
|
||
if __name__ == "__main__":
|
||
fetch_receipt_details_incremental() |