333 lines
16 KiB
Python
333 lines
16 KiB
Python
"""
|
||
发料单报表 - 智能增量同步脚本 (从第一页开始抓,遇到旧数据即停)
|
||
目标:
|
||
1. 自动连接本地 SQLite 数据库查询是否存在某条记录。
|
||
2. 进入 ERP 系统截获发料单数据,由于新数据都在第一页,我们从第 1 页开始抓。
|
||
3. 逐条对比,如果发现某页的数据在本地已经存在,则认为增量部分已经抓取完毕,提前终止。
|
||
4. 将新增数据存入 SQLite。
|
||
"""
|
||
import sys
|
||
import json
|
||
import time
|
||
import math
|
||
import random
|
||
import sqlite3
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
from login import get_page, log
|
||
from config import DB_PATH
|
||
|
||
HOME_URL = "https://yunmes.tftykj.cn/"
|
||
API_TARGET = "WorkOrdersDetailed_SearchListAll_Proxy"
|
||
|
||
def get_local_count(conn):
|
||
"""获取本地数据库已有的总记录数"""
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT COUNT(*) FROM issue_receipt_details")
|
||
return cursor.fetchone()[0]
|
||
|
||
def item_exists(cursor, item):
|
||
"""判断某条发料明细是否已在数据库中存在(基于 发料单号 + 行号 + 物料代码 组合判断)"""
|
||
wo_number = item.get("workOrdersNumber")
|
||
line_no = item.get("lineNumber")
|
||
mat_code = item.get("materialCode")
|
||
|
||
# 增加一个容错判断,如果其中有 None 就不当作重复
|
||
if not wo_number or not line_no or not mat_code:
|
||
return False
|
||
|
||
cursor.execute('''
|
||
SELECT 1 FROM issue_receipt_details
|
||
WHERE work_orders_number = ? AND line_number = ? AND material_code = ?
|
||
''', (wo_number, line_no, mat_code))
|
||
return cursor.fetchone() is not None
|
||
|
||
def _extract_fields(item):
|
||
"""提取所需的字段"""
|
||
return {
|
||
"生产任务单号": item.get("productionOrderNo"),
|
||
"生产物料代码": item.get("productMaterialCode"),
|
||
"生产物料名称": item.get("productMaterialName"),
|
||
"生产物料规格": item.get("productMaterialSpecification"),
|
||
"发料单号": item.get("workOrdersNumber"),
|
||
"状态": item.get("status"),
|
||
"物料规格": item.get("materialSpecification"),
|
||
"物料名称": item.get("materialName"),
|
||
"物料代码": item.get("materialCode"),
|
||
"发料数量": item.get("issueNumber"),
|
||
"已发料数量": item.get("hasIssueNumber"),
|
||
"金额": item.get("amount"),
|
||
"成本价": item.get("costPrice"),
|
||
"发料金额": item.get("issueAmount"),
|
||
"生产订单备注": item.get("productionOrderRemark"),
|
||
"明细备注": item.get("detailedRemark"),
|
||
"单位名称": item.get("unitName"),
|
||
"仓库名称": item.get("warehouseName"),
|
||
"行号": item.get("lineNumber"),
|
||
"发料单备注": item.get("workOrdersRemark"),
|
||
"执行人名称": item.get("executorUserName"),
|
||
"物料型号": item.get("materialModel"),
|
||
"执行时间": item.get("executionTime"),
|
||
"领料人": item.get("materialsUserName"),
|
||
"生产物料型号": item.get("productMaterialModel"),
|
||
"自定义字段": item.get("customField"),
|
||
"部门代码": item.get("departmentInformationCode"),
|
||
"部门名称": item.get("departmentInformationName"),
|
||
"图片文件": item.get("imageFile"),
|
||
"汇总金额": item.get("issueAmountTotal"),
|
||
"物料组代码": item.get("materialGroupCode"),
|
||
"物料组名称": item.get("materialGroupName"),
|
||
"单价小数位数": item.get("numnberOfReservedDigits"),
|
||
"单价进位策略": item.get("placeMentStrategy"),
|
||
"单价": item.get("price"),
|
||
"销售订单号": item.get("salesOrderCode")
|
||
}
|
||
|
||
def fetch_issue_receipt_incremental():
|
||
log("INFO", "=== 🚀 启动发料单报表 - 智能增量同步 (首屏更新模式) ===")
|
||
|
||
if not DB_PATH.exists():
|
||
log("ERR", f"找不到数据库文件: {DB_PATH},请先执行全量导入!")
|
||
return
|
||
|
||
conn = sqlite3.connect(DB_PATH)
|
||
local_count = get_local_count(conn)
|
||
log("INFO", f"📦 本地数据库当前总计: {local_count} 条数据")
|
||
|
||
# 强制不传 port 参数,使用 login.py 里的默认值,以触发 Docker 环境下的 auto_port 逻辑
|
||
page = get_page()
|
||
|
||
try:
|
||
# 如果是首次打开浏览器(没有保持登录状态),先调用 login
|
||
from login import login
|
||
log("INFO", "检查登录状态并执行登录...")
|
||
if not login(page):
|
||
log("ERR", "登录 ERP 系统失败,中止抓取。")
|
||
return
|
||
|
||
log("INFO", f"正在直接跳转到发料单明细页面...")
|
||
page.get("https://yunmes.tftykj.cn/WorkOrdersQuery")
|
||
page.wait.load_start()
|
||
time.sleep(2)
|
||
|
||
log("INFO", f"开启底层数据拦截网: {API_TARGET}")
|
||
page.listen.start(API_TARGET)
|
||
|
||
# 为了能够获取当月的数据,强制设置时间为当月第一天到最后一天,并清理其他条件
|
||
import datetime, calendar
|
||
now = datetime.datetime.now()
|
||
first_day = datetime.date(now.year, now.month, 1).strftime('%Y-%m-%d')
|
||
last_day = datetime.date(now.year, now.month, calendar.monthrange(now.year, now.month)[1]).strftime('%Y-%m-%d')
|
||
|
||
log("INFO", f"正在自动设置查询时间范围: {first_day} 至 {last_day}")
|
||
|
||
page.run_js(f"""
|
||
try {{
|
||
var dates = document.querySelectorAll('.datebox-f, .datetimebox-f, .el-date-editor input');
|
||
if (dates.length >= 2) {{
|
||
dates[0].value = '{first_day}';
|
||
dates[1].value = '{last_day}';
|
||
dates[0].dispatchEvent(new Event('input', {{ bubbles: true }}));
|
||
dates[0].dispatchEvent(new Event('change', {{ bubbles: true }}));
|
||
dates[1].dispatchEvent(new Event('input', {{ bubbles: true }}));
|
||
dates[1].dispatchEvent(new Event('change', {{ bubbles: true }}));
|
||
}}
|
||
}} catch(e) {{ console.log(e); }}
|
||
""")
|
||
time.sleep(1)
|
||
|
||
# 因为是直接打开 URL,数据通常不会自动加载,所以尝试点击查询按钮
|
||
packet = page.listen.wait(timeout=10)
|
||
if not packet:
|
||
# 兼容多种查询按钮的查找方式
|
||
log("INFO", "尝试寻找并点击页面上的【查询】按钮...")
|
||
query_btn = page.ele('text=查询', timeout=3)
|
||
if not query_btn:
|
||
query_btn = page.ele('xpath://button[contains(., "查询")]', timeout=3)
|
||
|
||
if query_btn:
|
||
try: query_btn.click()
|
||
except: page.run_js("arguments[0].click();", query_btn)
|
||
else:
|
||
log("WARN", "常规选择器找不到查询按钮,尝试使用全局 JS 强行寻找...")
|
||
page.run_js("""
|
||
var btns = document.querySelectorAll('button, a, .l-btn, .el-button');
|
||
for(var i=0; i<btns.length; i++) {
|
||
if(btns[i].innerText && btns[i].innerText.indexOf('查询') !== -1) {
|
||
btns[i].click();
|
||
break;
|
||
}
|
||
}
|
||
""")
|
||
packet = page.listen.wait(timeout=15)
|
||
|
||
if not packet:
|
||
log("ERR", "未能拦截到第一页数据,无法获取线上总条数。")
|
||
return
|
||
|
||
body = packet.response.body
|
||
data = body if isinstance(body, (dict, list)) else json.loads(body)
|
||
|
||
remote_count = 0
|
||
if isinstance(data, dict) and "result" in data:
|
||
remote_count = data["result"].get("totalCount", 0)
|
||
|
||
log("INFO", f"🌐 线上 ERP 系统当前总条数: {remote_count} 条")
|
||
|
||
if remote_count == local_count:
|
||
log("OK", "🎉 线上条数与本地一致,数据已是最新状态,无需抓取!")
|
||
return
|
||
|
||
new_items_count = remote_count - local_count
|
||
if new_items_count > 0:
|
||
log("INFO", f"🔥 发现大致 {new_items_count} 条新增数据!准备从第 1 页开始扫描录入...")
|
||
else:
|
||
log("INFO", f"⚠️ 线上条数 ({remote_count}) 少于本地条数 ({local_count}),可能存在数据删除。仍将扫描第一页验证更新。")
|
||
|
||
# =========================================================
|
||
# 开始处理第一页,并循环往后翻,直到遇到重复数据
|
||
# =========================================================
|
||
current_page = 1
|
||
cursor = conn.cursor()
|
||
total_inserted = 0
|
||
|
||
# 第一次的数据已经在上面的 packet 里了,直接处理
|
||
first_page_data = data
|
||
|
||
while True:
|
||
should_stop = False
|
||
inserted_this_page = 0
|
||
|
||
if isinstance(first_page_data, dict) and "result" in first_page_data:
|
||
items = first_page_data["result"].get("items", [])
|
||
if not items:
|
||
log("WARN", f"第 {current_page} 页返回了空列表,已无数据。")
|
||
break
|
||
|
||
# 打印第一条数据的信息,用于调试
|
||
if items:
|
||
first_item = items[0]
|
||
log("INFO", f"🔍 正在检查本页第一条数据: 发料单 {first_item.get('workOrdersNumber')} 行号 {first_item.get('lineNumber')} 物料 {first_item.get('materialCode')}")
|
||
|
||
for raw_item in items:
|
||
# 1. 如果不存在,提取并插入
|
||
if not item_exists(cursor, raw_item):
|
||
item = _extract_fields(raw_item)
|
||
|
||
cursor.execute('''
|
||
INSERT INTO issue_receipt_details (
|
||
production_order_no, product_material_code, product_material_name, product_material_specification,
|
||
work_orders_number, status, material_specification, material_name, material_code,
|
||
issue_number, has_issue_number, amount, cost_price, issue_amount,
|
||
production_order_remark, detailed_remark, unit_name, warehouse_name, line_number,
|
||
work_orders_remark, executor_user_name, material_model, execution_time, materials_user_name,
|
||
product_material_model, custom_field, department_information_code, department_information_name,
|
||
image_file, issue_amount_total, material_group_code, material_group_name,
|
||
numnber_of_reserved_digits, place_ment_strategy, price, sales_order_code
|
||
) VALUES (
|
||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
|
||
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
|
||
)
|
||
''', (
|
||
item.get("生产任务单号"), item.get("生产物料代码"), item.get("生产物料名称"), item.get("生产物料规格"),
|
||
item.get("发料单号"), item.get("状态"), item.get("物料规格"), item.get("物料名称"), item.get("物料代码"),
|
||
item.get("发料数量"), item.get("已发料数量"), item.get("金额"), item.get("成本价"), item.get("发料金额"),
|
||
item.get("生产订单备注"), item.get("明细备注"), item.get("单位名称"), item.get("仓库名称"), item.get("行号"),
|
||
item.get("发料单备注"), item.get("执行人名称"), item.get("物料型号"), item.get("执行时间"), item.get("领料人"),
|
||
item.get("生产物料型号"), item.get("自定义字段"), item.get("部门代码"), item.get("部门名称"),
|
||
item.get("图片文件"), item.get("汇总金额"), item.get("物料组代码"), item.get("物料组名称"),
|
||
item.get("单价小数位数"), item.get("单价进位策略"), item.get("单价"), item.get("销售订单号")
|
||
))
|
||
inserted_this_page += 1
|
||
total_inserted += 1
|
||
|
||
conn.commit()
|
||
log("OK", f"第 {current_page} 页处理完毕,成功插入 {inserted_this_page} 条新数据。")
|
||
|
||
else:
|
||
log("ERR", f"第 {current_page} 页数据结构异常,中止。")
|
||
break
|
||
|
||
# 如果当页没有新数据插入,说明已经追上了旧数据,停止抓取
|
||
if inserted_this_page == 0 and current_page > 1:
|
||
log("OK", "🎉 本页未发现任何新数据,说明增量部分已全部抓取完毕,停止翻页!")
|
||
break
|
||
|
||
# 如果没遇到旧数据,继续点击下一页
|
||
delay = random.uniform(1.5, 3.5)
|
||
log("INFO", f"⏳ 停顿 {delay:.2f} 秒后点击下一页...")
|
||
time.sleep(delay)
|
||
|
||
next_btn = None
|
||
for _ in range(3):
|
||
# 优先使用 pagination-next,如果不行再尝试 btn-next
|
||
next_btn = page.ele('xpath://*[contains(@class, "pagination-next")]', timeout=3)
|
||
if not next_btn:
|
||
next_btn = page.ele('xpath://button[contains(@class, "btn-next")]', timeout=3)
|
||
if next_btn:
|
||
break
|
||
time.sleep(1)
|
||
|
||
if not next_btn:
|
||
next_btn = page.ele('xpath://i[contains(@class, "el-icon-arrow-right")]/parent::button', timeout=3)
|
||
|
||
if next_btn:
|
||
# 检查按钮是否被禁用
|
||
class_str = str(next_btn.attr("class"))
|
||
aria_disabled = next_btn.attr("aria-disabled")
|
||
is_disabled_attr = next_btn.attr("disabled") is not None
|
||
|
||
# 检查父元素 <li> 是否被禁用
|
||
parent_class_str = ""
|
||
try:
|
||
parent_ele = next_btn.parent()
|
||
parent_class_str = str(parent_ele.attr("class"))
|
||
except:
|
||
pass
|
||
|
||
if "disabled" in class_str or "disabled" in parent_class_str or is_disabled_attr or aria_disabled == "true":
|
||
log("OK", "🏁 下一页按钮已被禁用,已经翻到最后一页。")
|
||
break
|
||
|
||
try:
|
||
# 优先使用 JS 点击防止遮挡
|
||
page.run_js("arguments[0].click();", next_btn)
|
||
except Exception as e:
|
||
log("ERR", f"JS 点击失败: {e},尝试普通点击...")
|
||
next_btn.click()
|
||
|
||
packet = page.listen.wait(timeout=15)
|
||
if not packet:
|
||
log("ERR", f"第 {current_page + 1} 页请求超时!")
|
||
break
|
||
|
||
# 为下一轮循环准备数据
|
||
body = packet.response.body
|
||
first_page_data = body if isinstance(body, (dict, list)) else json.loads(body)
|
||
else:
|
||
log("ERR", "重试 3 次后仍然找不到下一页按钮!")
|
||
break
|
||
|
||
current_page += 1
|
||
|
||
log("OK", f"🎉 发料单增量同步大功告成!总计新增了 {total_inserted} 条记录入库!")
|
||
|
||
except Exception as e:
|
||
log("ERR", f"发生全局异常: {e}")
|
||
finally:
|
||
if 'conn' in locals() and conn:
|
||
conn.close()
|
||
if 'page' in locals() and page:
|
||
try:
|
||
page.listen.stop()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
# 必须强制退出浏览器,释放 9222 端口和内存,防止产生僵尸进程导致 404 Not Found
|
||
page.quit()
|
||
except Exception:
|
||
pass
|
||
|
||
if __name__ == "__main__":
|
||
fetch_issue_receipt_incremental() |