Files
datie-bom/browser_login/fetch_issue_receipt_incremental.py
2026-06-12 16:30:57 +08:00

320 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
发料单报表 - 智能增量同步脚本 (从第一页开始抓,遇到旧数据即停)
目标:
1. 自动连接本地 SQLite 数据库查询是否存在某条记录。
2. 进入 ERP 系统截获发料单数据,由于新数据都在第一页,我们从第 1 页开始抓。
3. 逐条对比,如果发现某页的数据在本地已经存在,则认为增量部分已经抓取完毕,提前终止。
4. 将新增数据存入 SQLite。
"""
import sys
import json
import time
import math
import random
import sqlite3
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from login import get_page, log
from config import DB_PATH
HOME_URL = "https://yunmes.tftykj.cn/"
API_TARGET = "WorkOrdersDetailed_SearchListAll_Proxy"
def get_local_count(conn):
"""获取本地数据库已有的总记录数"""
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM issue_receipt_details")
return cursor.fetchone()[0]
def item_exists(cursor, item):
"""判断某条发料明细是否已在数据库中存在(基于 发料单号 + 行号 + 物料代码 组合判断)"""
wo_number = item.get("workOrdersNumber")
line_no = item.get("lineNumber")
mat_code = item.get("materialCode")
# 增加一个容错判断,如果其中有 None 就不当作重复
if not wo_number or not line_no or not mat_code:
return False
cursor.execute('''
SELECT 1 FROM issue_receipt_details
WHERE work_orders_number = ? AND line_number = ? AND material_code = ?
''', (wo_number, line_no, mat_code))
return cursor.fetchone() is not None
def _extract_fields(item):
"""提取所需的字段"""
return {
"生产任务单号": item.get("productionOrderNo"),
"生产物料代码": item.get("productMaterialCode"),
"生产物料名称": item.get("productMaterialName"),
"生产物料规格": item.get("productMaterialSpecification"),
"发料单号": item.get("workOrdersNumber"),
"状态": item.get("status"),
"物料规格": item.get("materialSpecification"),
"物料名称": item.get("materialName"),
"物料代码": item.get("materialCode"),
"发料数量": item.get("issueNumber"),
"已发料数量": item.get("hasIssueNumber"),
"金额": item.get("amount"),
"成本价": item.get("costPrice"),
"发料金额": item.get("issueAmount"),
"生产订单备注": item.get("productionOrderRemark"),
"明细备注": item.get("detailedRemark"),
"单位名称": item.get("unitName"),
"仓库名称": item.get("warehouseName"),
"行号": item.get("lineNumber"),
"发料单备注": item.get("workOrdersRemark"),
"执行人名称": item.get("executorUserName"),
"物料型号": item.get("materialModel"),
"执行时间": item.get("executionTime"),
"领料人": item.get("materialsUserName"),
"生产物料型号": item.get("productMaterialModel"),
"自定义字段": item.get("customField"),
"部门代码": item.get("departmentInformationCode"),
"部门名称": item.get("departmentInformationName"),
"图片文件": item.get("imageFile"),
"汇总金额": item.get("issueAmountTotal"),
"物料组代码": item.get("materialGroupCode"),
"物料组名称": item.get("materialGroupName"),
"单价小数位数": item.get("numnberOfReservedDigits"),
"单价进位策略": item.get("placeMentStrategy"),
"单价": item.get("price"),
"销售订单号": item.get("salesOrderCode")
}
def fetch_issue_receipt_incremental():
log("INFO", "=== 🚀 启动发料单报表 - 智能增量同步 (首屏更新模式) ===")
if not DB_PATH.exists():
log("ERR", f"找不到数据库文件: {DB_PATH},请先执行全量导入!")
return
conn = sqlite3.connect(DB_PATH)
local_count = get_local_count(conn)
log("INFO", f"📦 本地数据库当前总计: {local_count} 条数据")
page = get_page(port=9222)
try:
log("INFO", f"正在直接跳转到发料单明细页面...")
page.get("https://yunmes.tftykj.cn/WorkOrdersQuery")
page.wait.load_start()
time.sleep(2)
log("INFO", f"开启底层数据拦截网: {API_TARGET}")
page.listen.start(API_TARGET)
# 为了能够获取当月的数据,强制设置时间为当月第一天到最后一天,并清理其他条件
import datetime, calendar
now = datetime.datetime.now()
first_day = datetime.date(now.year, now.month, 1).strftime('%Y-%m-%d')
last_day = datetime.date(now.year, now.month, calendar.monthrange(now.year, now.month)[1]).strftime('%Y-%m-%d')
log("INFO", f"正在自动设置查询时间范围: {first_day}{last_day}")
page.run_js(f"""
try {{
var dates = document.querySelectorAll('.datebox-f, .datetimebox-f, .el-date-editor input');
if (dates.length >= 2) {{
dates[0].value = '{first_day}';
dates[1].value = '{last_day}';
dates[0].dispatchEvent(new Event('input', {{ bubbles: true }}));
dates[0].dispatchEvent(new Event('change', {{ bubbles: true }}));
dates[1].dispatchEvent(new Event('input', {{ bubbles: true }}));
dates[1].dispatchEvent(new Event('change', {{ bubbles: true }}));
}}
}} catch(e) {{ console.log(e); }}
""")
time.sleep(1)
# 因为是直接打开 URL数据通常不会自动加载所以尝试点击查询按钮
packet = page.listen.wait(timeout=10)
if not packet:
# 兼容多种查询按钮的查找方式
log("INFO", "尝试寻找并点击页面上的【查询】按钮...")
query_btn = page.ele('text=查询', timeout=3)
if not query_btn:
query_btn = page.ele('xpath://button[contains(., "查询")]', timeout=3)
if query_btn:
try: query_btn.click()
except: page.run_js("arguments[0].click();", query_btn)
else:
log("WARN", "常规选择器找不到查询按钮,尝试使用全局 JS 强行寻找...")
page.run_js("""
var btns = document.querySelectorAll('button, a, .l-btn, .el-button');
for(var i=0; i<btns.length; i++) {
if(btns[i].innerText && btns[i].innerText.indexOf('查询') !== -1) {
btns[i].click();
break;
}
}
""")
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", "未能拦截到第一页数据,无法获取线上总条数。")
return
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
remote_count = 0
if isinstance(data, dict) and "result" in data:
remote_count = data["result"].get("totalCount", 0)
log("INFO", f"🌐 线上 ERP 系统当前总条数: {remote_count}")
if remote_count == local_count:
log("OK", "🎉 线上条数与本地一致,数据已是最新状态,无需抓取!")
return
new_items_count = remote_count - local_count
if new_items_count > 0:
log("INFO", f"🔥 发现大致 {new_items_count} 条新增数据!准备从第 1 页开始扫描录入...")
else:
log("INFO", f"⚠️ 线上条数 ({remote_count}) 少于本地条数 ({local_count}),可能存在数据删除。仍将扫描第一页验证更新。")
# =========================================================
# 开始处理第一页,并循环往后翻,直到遇到重复数据
# =========================================================
current_page = 1
cursor = conn.cursor()
total_inserted = 0
# 第一次的数据已经在上面的 packet 里了,直接处理
first_page_data = data
while True:
should_stop = False
inserted_this_page = 0
if isinstance(first_page_data, dict) and "result" in first_page_data:
items = first_page_data["result"].get("items", [])
if not items:
log("WARN", f"{current_page} 页返回了空列表,已无数据。")
break
# 打印第一条数据的信息,用于调试
if items:
first_item = items[0]
log("INFO", f"🔍 正在检查本页第一条数据: 发料单 {first_item.get('workOrdersNumber')} 行号 {first_item.get('lineNumber')} 物料 {first_item.get('materialCode')}")
for raw_item in items:
# 1. 如果不存在,提取并插入
if not item_exists(cursor, raw_item):
item = _extract_fields(raw_item)
cursor.execute('''
INSERT INTO issue_receipt_details (
production_order_no, product_material_code, product_material_name, product_material_specification,
work_orders_number, status, material_specification, material_name, material_code,
issue_number, has_issue_number, amount, cost_price, issue_amount,
production_order_remark, detailed_remark, unit_name, warehouse_name, line_number,
work_orders_remark, executor_user_name, material_model, execution_time, materials_user_name,
product_material_model, custom_field, department_information_code, department_information_name,
image_file, issue_amount_total, material_group_code, material_group_name,
numnber_of_reserved_digits, place_ment_strategy, price, sales_order_code
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
''', (
item.get("生产任务单号"), item.get("生产物料代码"), item.get("生产物料名称"), item.get("生产物料规格"),
item.get("发料单号"), item.get("状态"), item.get("物料规格"), item.get("物料名称"), item.get("物料代码"),
item.get("发料数量"), item.get("已发料数量"), item.get("金额"), item.get("成本价"), item.get("发料金额"),
item.get("生产订单备注"), item.get("明细备注"), item.get("单位名称"), item.get("仓库名称"), item.get("行号"),
item.get("发料单备注"), item.get("执行人名称"), item.get("物料型号"), item.get("执行时间"), item.get("领料人"),
item.get("生产物料型号"), item.get("自定义字段"), item.get("部门代码"), item.get("部门名称"),
item.get("图片文件"), item.get("汇总金额"), item.get("物料组代码"), item.get("物料组名称"),
item.get("单价小数位数"), item.get("单价进位策略"), item.get("单价"), item.get("销售订单号")
))
inserted_this_page += 1
total_inserted += 1
conn.commit()
log("OK", f"{current_page} 页处理完毕,成功插入 {inserted_this_page} 条新数据。")
else:
log("ERR", f"{current_page} 页数据结构异常,中止。")
break
# 如果当页没有新数据插入,说明已经追上了旧数据,停止抓取
if inserted_this_page == 0 and current_page > 1:
log("OK", "🎉 本页未发现任何新数据,说明增量部分已全部抓取完毕,停止翻页!")
break
# 如果没遇到旧数据,继续点击下一页
delay = random.uniform(1.5, 3.5)
log("INFO", f"⏳ 停顿 {delay:.2f} 秒后点击下一页...")
time.sleep(delay)
next_btn = None
for _ in range(3):
# 优先使用 pagination-next如果不行再尝试 btn-next
next_btn = page.ele('xpath://*[contains(@class, "pagination-next")]', timeout=3)
if not next_btn:
next_btn = page.ele('xpath://button[contains(@class, "btn-next")]', timeout=3)
if next_btn:
break
time.sleep(1)
if not next_btn:
next_btn = page.ele('xpath://i[contains(@class, "el-icon-arrow-right")]/parent::button', timeout=3)
if next_btn:
# 检查按钮是否被禁用
class_str = str(next_btn.attr("class"))
aria_disabled = next_btn.attr("aria-disabled")
is_disabled_attr = next_btn.attr("disabled") is not None
# 检查父元素 <li> 是否被禁用
parent_class_str = ""
try:
parent_ele = next_btn.parent()
parent_class_str = str(parent_ele.attr("class"))
except:
pass
if "disabled" in class_str or "disabled" in parent_class_str or is_disabled_attr or aria_disabled == "true":
log("OK", "🏁 下一页按钮已被禁用,已经翻到最后一页。")
break
try:
# 优先使用 JS 点击防止遮挡
page.run_js("arguments[0].click();", next_btn)
except Exception as e:
log("ERR", f"JS 点击失败: {e},尝试普通点击...")
next_btn.click()
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", f"{current_page + 1} 页请求超时!")
break
# 为下一轮循环准备数据
body = packet.response.body
first_page_data = body if isinstance(body, (dict, list)) else json.loads(body)
else:
log("ERR", "重试 3 次后仍然找不到下一页按钮!")
break
current_page += 1
log("OK", f"🎉 发料单增量同步大功告成!总计新增了 {total_inserted} 条记录入库!")
except Exception as e:
log("ERR", f"发生全局异常: {e}")
finally:
if 'conn' in locals() and conn:
conn.close()
if 'page' in locals() and page:
try:
page.listen.stop()
except Exception:
pass
if __name__ == "__main__":
fetch_issue_receipt_incremental()