""" 发料单报表 - 智能增量同步脚本 (从第一页开始抓,遇到旧数据即停) 目标: 1. 自动连接本地 SQLite 数据库查询是否存在某条记录。 2. 进入 ERP 系统截获发料单数据,由于新数据都在第一页,我们从第 1 页开始抓。 3. 逐条对比,如果发现某页的数据在本地已经存在,则认为增量部分已经抓取完毕,提前终止。 4. 将新增数据存入 SQLite。 """ import sys import json import time import math import random import sqlite3 from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from login import get_page, log from config import DB_PATH HOME_URL = "https://yunmes.tftykj.cn/" API_TARGET = "WorkOrdersDetailed_SearchListAll_Proxy" def get_local_count(conn): """获取本地数据库已有的总记录数""" cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM issue_receipt_details") return cursor.fetchone()[0] def item_exists(cursor, item): """判断某条发料明细是否已在数据库中存在(基于 发料单号 + 行号 + 物料代码 组合判断)""" wo_number = item.get("workOrdersNumber") line_no = item.get("lineNumber") mat_code = item.get("materialCode") # 增加一个容错判断,如果其中有 None 就不当作重复 if not wo_number or not line_no or not mat_code: return False cursor.execute(''' SELECT 1 FROM issue_receipt_details WHERE work_orders_number = ? AND line_number = ? AND material_code = ? ''', (wo_number, line_no, mat_code)) return cursor.fetchone() is not None def _extract_fields(item): """提取所需的字段""" return { "生产任务单号": item.get("productionOrderNo"), "生产物料代码": item.get("productMaterialCode"), "生产物料名称": item.get("productMaterialName"), "生产物料规格": item.get("productMaterialSpecification"), "发料单号": item.get("workOrdersNumber"), "状态": item.get("status"), "物料规格": item.get("materialSpecification"), "物料名称": item.get("materialName"), "物料代码": item.get("materialCode"), "发料数量": item.get("issueNumber"), "已发料数量": item.get("hasIssueNumber"), "金额": item.get("amount"), "成本价": item.get("costPrice"), "发料金额": item.get("issueAmount"), "生产订单备注": item.get("productionOrderRemark"), "明细备注": item.get("detailedRemark"), "单位名称": item.get("unitName"), "仓库名称": item.get("warehouseName"), "行号": item.get("lineNumber"), "发料单备注": item.get("workOrdersRemark"), "执行人名称": item.get("executorUserName"), "物料型号": item.get("materialModel"), "执行时间": item.get("executionTime"), "领料人": item.get("materialsUserName"), "生产物料型号": item.get("productMaterialModel"), "自定义字段": item.get("customField"), "部门代码": item.get("departmentInformationCode"), "部门名称": item.get("departmentInformationName"), "图片文件": item.get("imageFile"), "汇总金额": item.get("issueAmountTotal"), "物料组代码": item.get("materialGroupCode"), "物料组名称": item.get("materialGroupName"), "单价小数位数": item.get("numnberOfReservedDigits"), "单价进位策略": item.get("placeMentStrategy"), "单价": item.get("price"), "销售订单号": item.get("salesOrderCode") } def fetch_issue_receipt_incremental(): log("INFO", "=== 🚀 启动发料单报表 - 智能增量同步 (首屏更新模式) ===") if not DB_PATH.exists(): log("ERR", f"找不到数据库文件: {DB_PATH},请先执行全量导入!") return conn = sqlite3.connect(DB_PATH) local_count = get_local_count(conn) log("INFO", f"📦 本地数据库当前总计: {local_count} 条数据") # 强制不传 port 参数,使用 login.py 里的默认值,以触发 Docker 环境下的 auto_port 逻辑 page = get_page() try: # 如果是首次打开浏览器(没有保持登录状态),先调用 login from login import login log("INFO", "检查登录状态并执行登录...") if not login(page): log("ERR", "登录 ERP 系统失败,中止抓取。") return log("INFO", f"正在直接跳转到发料单明细页面...") page.get("https://yunmes.tftykj.cn/WorkOrdersQuery") page.wait.load_start() time.sleep(2) log("INFO", f"开启底层数据拦截网: {API_TARGET}") page.listen.start(API_TARGET) # 为了能够获取当月的数据,强制设置时间为当月第一天到最后一天,并清理其他条件 import datetime, calendar now = datetime.datetime.now() first_day = datetime.date(now.year, now.month, 1).strftime('%Y-%m-%d') last_day = datetime.date(now.year, now.month, calendar.monthrange(now.year, now.month)[1]).strftime('%Y-%m-%d') log("INFO", f"正在自动设置查询时间范围: {first_day} 至 {last_day}") page.run_js(f""" try {{ var dates = document.querySelectorAll('.datebox-f, .datetimebox-f, .el-date-editor input'); if (dates.length >= 2) {{ dates[0].value = '{first_day}'; dates[1].value = '{last_day}'; dates[0].dispatchEvent(new Event('input', {{ bubbles: true }})); dates[0].dispatchEvent(new Event('change', {{ bubbles: true }})); dates[1].dispatchEvent(new Event('input', {{ bubbles: true }})); dates[1].dispatchEvent(new Event('change', {{ bubbles: true }})); }} }} catch(e) {{ console.log(e); }} """) time.sleep(1) # 因为是直接打开 URL,数据通常不会自动加载,所以尝试点击查询按钮 packet = page.listen.wait(timeout=10) if not packet: # 兼容多种查询按钮的查找方式 log("INFO", "尝试寻找并点击页面上的【查询】按钮...") query_btn = page.ele('text=查询', timeout=3) if not query_btn: query_btn = page.ele('xpath://button[contains(., "查询")]', timeout=3) if query_btn: try: query_btn.click() except: page.run_js("arguments[0].click();", query_btn) else: log("WARN", "常规选择器找不到查询按钮,尝试使用全局 JS 强行寻找...") page.run_js(""" var btns = document.querySelectorAll('button, a, .l-btn, .el-button'); for(var i=0; i 0: log("INFO", f"🔥 发现大致 {new_items_count} 条新增数据!准备从第 1 页开始扫描录入...") else: log("INFO", f"⚠️ 线上条数 ({remote_count}) 少于本地条数 ({local_count}),可能存在数据删除。仍将扫描第一页验证更新。") # ========================================================= # 开始处理第一页,并循环往后翻,直到遇到重复数据 # ========================================================= current_page = 1 cursor = conn.cursor() total_inserted = 0 # 第一次的数据已经在上面的 packet 里了,直接处理 first_page_data = data while True: should_stop = False inserted_this_page = 0 if isinstance(first_page_data, dict) and "result" in first_page_data: items = first_page_data["result"].get("items", []) if not items: log("WARN", f"第 {current_page} 页返回了空列表,已无数据。") break # 打印第一条数据的信息,用于调试 if items: first_item = items[0] log("INFO", f"🔍 正在检查本页第一条数据: 发料单 {first_item.get('workOrdersNumber')} 行号 {first_item.get('lineNumber')} 物料 {first_item.get('materialCode')}") for raw_item in items: # 1. 如果不存在,提取并插入 if not item_exists(cursor, raw_item): item = _extract_fields(raw_item) cursor.execute(''' INSERT INTO issue_receipt_details ( production_order_no, product_material_code, product_material_name, product_material_specification, work_orders_number, status, material_specification, material_name, material_code, issue_number, has_issue_number, amount, cost_price, issue_amount, production_order_remark, detailed_remark, unit_name, warehouse_name, line_number, work_orders_remark, executor_user_name, material_model, execution_time, materials_user_name, product_material_model, custom_field, department_information_code, department_information_name, image_file, issue_amount_total, material_group_code, material_group_name, numnber_of_reserved_digits, place_ment_strategy, price, sales_order_code ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) ''', ( item.get("生产任务单号"), item.get("生产物料代码"), item.get("生产物料名称"), item.get("生产物料规格"), item.get("发料单号"), item.get("状态"), item.get("物料规格"), item.get("物料名称"), item.get("物料代码"), item.get("发料数量"), item.get("已发料数量"), item.get("金额"), item.get("成本价"), item.get("发料金额"), item.get("生产订单备注"), item.get("明细备注"), item.get("单位名称"), item.get("仓库名称"), item.get("行号"), item.get("发料单备注"), item.get("执行人名称"), item.get("物料型号"), item.get("执行时间"), item.get("领料人"), item.get("生产物料型号"), item.get("自定义字段"), item.get("部门代码"), item.get("部门名称"), item.get("图片文件"), item.get("汇总金额"), item.get("物料组代码"), item.get("物料组名称"), item.get("单价小数位数"), item.get("单价进位策略"), item.get("单价"), item.get("销售订单号") )) inserted_this_page += 1 total_inserted += 1 conn.commit() log("OK", f"第 {current_page} 页处理完毕,成功插入 {inserted_this_page} 条新数据。") else: log("ERR", f"第 {current_page} 页数据结构异常,中止。") break # 如果当页没有新数据插入,说明已经追上了旧数据,停止抓取 if inserted_this_page == 0 and current_page > 1: log("OK", "🎉 本页未发现任何新数据,说明增量部分已全部抓取完毕,停止翻页!") break # 如果没遇到旧数据,继续点击下一页 delay = random.uniform(1.5, 3.5) log("INFO", f"⏳ 停顿 {delay:.2f} 秒后点击下一页...") time.sleep(delay) next_btn = None for _ in range(3): # 优先使用 pagination-next,如果不行再尝试 btn-next next_btn = page.ele('xpath://*[contains(@class, "pagination-next")]', timeout=3) if not next_btn: next_btn = page.ele('xpath://button[contains(@class, "btn-next")]', timeout=3) if next_btn: break time.sleep(1) if not next_btn: next_btn = page.ele('xpath://i[contains(@class, "el-icon-arrow-right")]/parent::button', timeout=3) if next_btn: # 检查按钮是否被禁用 class_str = str(next_btn.attr("class")) aria_disabled = next_btn.attr("aria-disabled") is_disabled_attr = next_btn.attr("disabled") is not None # 检查父元素
  • 是否被禁用 parent_class_str = "" try: parent_ele = next_btn.parent() parent_class_str = str(parent_ele.attr("class")) except: pass if "disabled" in class_str or "disabled" in parent_class_str or is_disabled_attr or aria_disabled == "true": log("OK", "🏁 下一页按钮已被禁用,已经翻到最后一页。") break try: # 优先使用 JS 点击防止遮挡 page.run_js("arguments[0].click();", next_btn) except Exception as e: log("ERR", f"JS 点击失败: {e},尝试普通点击...") next_btn.click() packet = page.listen.wait(timeout=15) if not packet: log("ERR", f"第 {current_page + 1} 页请求超时!") break # 为下一轮循环准备数据 body = packet.response.body first_page_data = body if isinstance(body, (dict, list)) else json.loads(body) else: log("ERR", "重试 3 次后仍然找不到下一页按钮!") break current_page += 1 log("OK", f"🎉 发料单增量同步大功告成!总计新增了 {total_inserted} 条记录入库!") except Exception as e: import traceback log("ERR", f"发生全局异常: {e}\n{traceback.format_exc()}") finally: if 'conn' in locals() and conn: conn.close() if 'page' in locals() and page: try: page.listen.stop() except Exception: pass try: # 必须强制退出浏览器,释放 9222 端口和内存,防止产生僵尸进程导致 404 Not Found page.quit() except Exception: pass if __name__ == "__main__": fetch_issue_receipt_incremental()