新内容

This commit is contained in:
Jimmy
2026-04-27 15:23:00 +08:00
parent fc8f14b301
commit 29954a7af0
8 changed files with 1072 additions and 2 deletions

View File

@@ -0,0 +1,80 @@
"""
ERP 数据同步器 - 自动化浏览器拉起与登录守护模块
目标:
1. 自动寻找本地安装的 Chrome 浏览器。
2. 以 9222 端口和独立的用户数据目录启动(互不干扰,持久化登录状态)。
3. 弹出 ERP 登录页面,等待用户手动登录(解决滑块验证码等问题)。
4. 登录成功后,将浏览器挂在后台作为保活引擎。
"""
import sys
import time
from pathlib import Path
from DrissionPage import ChromiumOptions, ChromiumPage
from config import DATA_DIR
# 独立的用户数据目录,放在外部持久化目录,这样重启不会丢失登录状态
USER_DATA_DIR = DATA_DIR / "browser_login" / "chrome_user_data"
HOME_URL = "https://yunmes.tftykj.cn/"
def start_and_wait_login():
print("🚀 [1/3] 正在配置内置 Chrome 浏览器引擎...")
# 初始化配置
co = ChromiumOptions()
co.set_local_port(9222)
# 指定一个独立的用户数据存放文件夹
co.set_user_data_path(str(USER_DATA_DIR))
# 忽略证书错误等常规反爬配置
co.ignore_certificate_errors()
print("🌍 [2/3] 正在拉起浏览器并前往 ERP 登录页...")
try:
# 这一步会自动寻找你电脑上的 Chrome如果没有运行在 9222 端口,它会自动帮你启动一个!
page = ChromiumPage(co)
except Exception as e:
print(f"❌ 启动浏览器失败,请确保电脑安装了 Chrome 浏览器!报错信息: {e}")
return None
page.get(HOME_URL)
print("\n" + "="*50)
print("👀 [等待人工介入] 请在弹出的浏览器窗口中完成登录操作!")
print("💡 提示: 输入账号密码、通过滑块验证码,直到进入 ERP 系统主界面。")
print("="*50 + "\n")
# 循环检查登录状态
# 怎么判断登录成功ERP 登录前 URL 通常带有 Login 等字样,或者登录后页面会出现类似“退出”、“注销”或者用户名的元素
# 我们这里通过检测页面中是否出现了主菜单的特有元素,或者通过监听一个登录后的特有接口来判断
is_logged_in = False
while not is_logged_in:
time.sleep(2)
# 假设登录后页面会出现“首页”或者用户的头像/名字(这里的 xpath 需要根据你们 ERP 登录后的实际情况微调,我们先用一个保险的:看看有没有业务统计报表的菜单)
# 如果还在登录页,肯定找不到这个元素
try:
# 这里的元素用来验证是否已经成功进入系统内部
menu_ele = page.ele('xpath://*[@id="app"]/div/div[1]/div[1]/div[2]/div/div[1]/div/div[10]/div/p', timeout=1)
if menu_ele:
is_logged_in = True
except:
pass
if not is_logged_in:
print("⏳ 等待登录中...")
print("\n✅ [3/3] 检测到登录成功!")
print("🔒 登录状态已保存,你可以随时关闭或者最小化这个浏览器窗口。")
print("🤖 爬虫引擎已挂载至后台,可以开始点击前端界面的【同步数据】按钮了!\n")
return page
if __name__ == "__main__":
# 单独运行此文件即可拉起浏览器
page = start_and_wait_login()
if page:
# 为了不让脚本退出,这里写个死循环保活(在真正的桌面软件中,这里就是启动 Flask 后端和 Webview 窗口的地方)
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print("👋 守护进程已退出。")

View File

@@ -14,11 +14,10 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from login import get_page, login, login_manual, log, dump_page_state
from config import OUTPUT_DIR
BOM_PAGE_URL = "https://yunmes.tftykj.cn/MaterialBom"
BOM_API_PATH = "MaterialBom_SearchList_Proxy"
OUTPUT_DIR = Path(__file__).parent / "output"
OUTPUT_DIR.mkdir(exist_ok=True)
# ── 导航到 BOM 页面 ───────────────────────────────────────────────────────────

17
browser_login/config.py Normal file
View File

@@ -0,0 +1,17 @@
import sys
import os
from pathlib import Path
def get_data_dir():
"""获取持久化数据存放目录(数据库、输出文件等,保证重启不丢失)"""
if getattr(sys, 'frozen', False):
return Path(os.path.dirname(sys.executable))
return Path(__file__).parent.parent
DATA_DIR = get_data_dir()
# 通用输出目录,用于存放 JSON 文件和 SQLite 数据库
OUTPUT_DIR = DATA_DIR / "browser_login" / "output"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = OUTPUT_DIR / "erp_data.db"

View File

@@ -0,0 +1,227 @@
"""
BOM 成本 - 终极树状结构抓取脚本 (全站 1400+ 父件及 5 层嵌套子件)
目标:
1. 抓取所有父件(成本核算表主页)
2. 暗网请求所有父件下对应的 BOM 成本数据(扁平的 5 层数据)
3. 实时清洗并重组为完美嵌套的 JSON 树
"""
import sys
import json
import time
import random
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from login import get_page, log
from config import OUTPUT_DIR
PAGE_URL = "https://yunmes.tftykj.cn/PartBomCostAccounting"
API_PARENT = "PartBomCostAccounting_SearchList_Proxy"
# 最终保存的文件名
TREE_FILE_PATH = OUTPUT_DIR / "bom_cost_full_tree_final.json"
def build_nested_tree(flat_items):
"""将包含 _parentId 的扁平列表转换为嵌套树"""
if not flat_items:
return []
node_dict = {}
for item in flat_items:
son_no = item.get("sonNO")
if not son_no:
continue
clean_node = {
"sonNO": son_no,
"_parentId": item.get("_parentId"),
"bomLevel": item.get("bomLevel"),
"childMaterialCode": item.get("childMaterialCode"),
"childMaterialName": item.get("childMaterialName"),
"usageQty": item.get("childrenMaterialConsumption", 1),
"sub_items": []
}
node_dict[son_no] = clean_node
roots = []
for son_no, node in node_dict.items():
parent_id = node.get("_parentId")
if parent_id is None:
roots.append(node)
else:
parent_node = node_dict.get(parent_id)
if parent_node:
parent_node["sub_items"].append(node)
# 清理建树临时字段
def clean_temp(node_list):
for node in node_list:
node.pop("sonNO", None)
node.pop("_parentId", None)
if node["sub_items"]:
clean_temp(node["sub_items"])
else:
node.pop("sub_items", None)
clean_temp(roots)
return roots
def fetch_bom_cost_tree():
log("INFO", "=== 🌳 启动 BOM 成本终极抓取 (多层嵌套自动重组) ===")
page = get_page(port=9222)
clean_parents_list = []
try:
# =========================================================
# 第一阶段:获取父件基础信息
# =========================================================
log("INFO", f"正在访问安全的父件页面: {PAGE_URL}")
page.get(PAGE_URL)
page.wait.load_start()
log("INFO", f"开启父件 API 网络监听: {API_PARENT}")
page.listen.start(API_PARENT)
page.refresh()
current_page = 1
total_records = 0
while True:
log("INFO", f"等待第 {current_page} 页父件 API 响应...")
packet = page.listen.wait(timeout=20)
if not packet:
log("ERR", f"超时未收到第 {current_page} 页数据,父件扫荡结束。")
break
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
if isinstance(data, dict) and "result" in data:
items = data["result"].get("items", [])
total_records = data["result"].get("totalCount", 0)
for item in items:
# 注意:我们要拿的是 parentMaterialId因为这是传给 BOM 成本 API 的关键参数 materialId
clean_parent = {
"_id": item.get("id"), # 这个是 partBomCostAccountingId
"_materialId": item.get("parentMaterialId"), # 这个是传给子件的 materialId
"parentMaterialCode": item.get("parentMaterialCode"),
"parentMaterialName": item.get("parentMaterialName"),
"bom_cost_tree": [] # 准备挂载这棵树
}
clean_parents_list.append(clean_parent)
log("OK", f"提取了 {len(items)} 个父件。总进度: {len(clean_parents_list)}/{total_records}")
if len(clean_parents_list) >= total_records or len(items) == 0:
break
else:
break
# 准备翻页
next_btn_xpath = "xpath:/html/body/div[1]/div/div[3]/table/tbody/tr/td[10]/a/span/span[2]"
next_btn = page.ele(next_btn_xpath, timeout=5)
if next_btn:
parent_a = next_btn.parent(2)
if parent_a and "disabled" in parent_a.attr("class"):
log("INFO", "已到达最后一页。")
break
page.run_js("arguments[0].click();", next_btn)
time.sleep(1.5)
else:
log("WARN", "未找到下一页按钮,停止翻页。")
break
current_page += 1
page.listen.stop()
# =========================================================
# 第二阶段:暗网递归注入,重组 5 层嵌套树
# =========================================================
log("INFO", f"=== 🚀 开始为 {len(clean_parents_list)} 个父件抓取 BOM 成本树 ===")
js_template = """
return new Promise((resolve, reject) => {
if (typeof $ !== 'undefined' && $.ajax) {
$.ajax({
url: '/api/services/TfTechApi/PartBom/PartBom_SearchByTreeCost',
type: 'POST',
data: {
materialId: MATERIAL_ID_PLACEHOLDER,
partBomCostAccountingId: ACCOUNTING_ID_PLACEHOLDER,
childMaterialCode: '',
childMaterialName: '',
childMaterialSpecification: '',
childMaterialModel: ''
},
headers: {
'referer': 'https://yunmes.tftykj.cn/PartBomCostAccounting/Detail?id=ACCOUNTING_ID_PLACEHOLDER'
},
success: function(response) {
resolve({status: 'success', data: response});
},
error: function(xhr, status, error) {
resolve({status: 'error', data: xhr.responseText || error});
}
});
} else {
resolve({status: 'error', data: 'No jQuery'});
}
});
"""
for index, parent in enumerate(clean_parents_list):
accounting_id = parent.get("_id")
material_id = parent.get("_materialId")
parent_code = parent.get("parentMaterialCode", "未知")
if not accounting_id or not material_id:
continue
log("INFO", f"[{index+1}/{len(clean_parents_list)}] 正在请求 BOM 成本树 (Code: {parent_code})...")
js_code = js_template.replace("MATERIAL_ID_PLACEHOLDER", str(material_id)).replace("ACCOUNTING_ID_PLACEHOLDER", str(accounting_id))
result = page.run_js(js_code)
if result and result.get('status') == 'success':
data = result.get('data')
if isinstance(data, str):
try: data = json.loads(data)
except: pass
if isinstance(data, dict) and "result" in data:
flat_items = data["result"]
if isinstance(flat_items, list):
# 核心:调用刚才验证成功的重组函数,把扁平列表变成 5 层树
nested_tree = build_nested_tree(flat_items)
parent["bom_cost_tree"] = nested_tree
log("OK", f" └── 成功重组了一棵包含 {len(flat_items)} 个节点的多层树。")
else:
log("ERR", f" └── 请求失败: {result.get('data') if result else '未知错误'}")
time.sleep(random.uniform(0.3, 0.7))
if (index + 1) % 10 == 0 or (index + 1) == len(clean_parents_list):
# 最终保存前,清理一下用于请求的临时字段
clean_save_list = []
for p in clean_parents_list[:index+1]:
clean_p = dict(p)
clean_p.pop("_id", None)
clean_p.pop("_materialId", None)
clean_save_list.append(clean_p)
with open(TREE_FILE_PATH, "w", encoding="utf-8") as f:
json.dump(clean_save_list, f, ensure_ascii=False, indent=2)
log("INFO", f"💾 进度已实时保存至 JSON ({index+1}/{len(clean_parents_list)})")
log("OK", f"=== 🏆 终极 BOM 成本多层树状抓取完成!文件路径: {TREE_FILE_PATH} ===")
except Exception as e:
log("ERR", f"发生异常: {e}")
if __name__ == "__main__":
fetch_bom_cost_tree()

View File

@@ -0,0 +1,217 @@
"""
收货明细报表 - 全量分页抓取 (精简字段模式)
目标: 模拟点击菜单,过滤 11 个核心字段,并循环点击下一页,直到所有数据抓取完毕。
"""
import sys
import json
import time
import random
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from login import get_page, log
from config import OUTPUT_DIR
HOME_URL = "https://yunmes.tftykj.cn/"
API_TARGET = "ReceiptDetailsCheck_SearchList_Proxy"
SAVE_PATH = OUTPUT_DIR / "receipt_details_full_clean.json"
def fetch_receipt_details_full():
log("INFO", "=== 🚚 启动收货明细报表全量抓取 (精简字段模式) ===")
page = get_page(port=9222)
all_clean_items = []
try:
log("INFO", f"正在回到主页起点: {HOME_URL}")
page.get(HOME_URL)
page.wait.load_start()
time.sleep(2)
menus = [
("第一层: 业务统计报表", 'xpath://*[@id="app"]/div/div[1]/div[1]/div[2]/div/div[1]/div/div[10]/div/p'),
("第二层: 采购业务报表", 'xpath:/html/body/div[7]/div/div[1]/div/div[4]/div/p'),
("第三层: 收货明细报表", 'xpath:/html/body/div[8]/div/div[1]/div/div[4]/div/p')
]
log("INFO", "开始模拟人工点击左侧导航菜单...")
for name, xpath in menus:
ele = page.ele(xpath, timeout=5)
if ele:
try: ele.click()
except: page.run_js("arguments[0].click();", ele)
time.sleep(1.5)
else:
log("ERR", f"找不到菜单元素: {name}")
return
log("OK", "✅ 成功点开收货明细报表界面!")
# 点击空白处隐藏菜单
blank_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[1]/div[2]/div[2]/div/div[1]/div'
blank_ele = page.ele(blank_xpath, timeout=3)
if blank_ele:
try: blank_ele.click()
except: page.run_js("arguments[0].click();", blank_ele)
time.sleep(0.5)
log("INFO", f"开启底层数据拦截网: {API_TARGET}")
page.listen.start(API_TARGET)
packet = page.listen.wait(timeout=10)
if not packet:
log("INFO", "尝试寻找并点击页面上的【查询】按钮...")
query_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[1]/div/button[1]/span'
query_btn = page.ele(query_btn_xpath, timeout=3)
if query_btn:
try: query_btn.click()
except: page.run_js("arguments[0].click();", query_btn)
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", "未能拦截到第一页数据,可能网络超时或查询未触发。")
page.listen.stop()
return
# =========================================================
# 第一页数据处理
# =========================================================
log("OK", f"🎉 成功拦截到第一页数据HTTP: {packet.response.status}")
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
total_count = 0
if isinstance(data, dict) and "result" in data:
total_count = data["result"].get("totalCount", 0)
items = data["result"].get("items", [])
for item in items:
all_clean_items.append({
"采购订单号": item.get("purchaseOrderCode"),
"行号": item.get("rowsNum"),
"物料代码": item.get("materialCode"),
"物料名称": item.get("materialName"),
"物料规格": item.get("materialSpecification"),
"仓库代码": item.get("warehouseCode"),
"仓库名称": item.get("warehouseName"),
"供应商代码": item.get("supplierCode"),
"供应商名称": item.get("supplierName"),
"单位名称": item.get("unitName"),
"转换单位": item.get("convertUnitName"),
"收货单价": item.get("receivePrice"),
"收货时间": item.get("receiptTime"),
"进货数量": item.get("convertPlannedPurchaseQuantity") if item.get("convertPlannedPurchaseQuantity") is not None else item.get("plannedPurchaseQuantity"),
"收货数量": item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity"),
"收货总金额": item.get("receiveAmount")
})
log("OK", f"第一页清洗完成,提取了 {len(items)} 条数据。后端报告总条数: {total_count}")
page_num = 1
# =========================================================
# 循环翻页抓取
# =========================================================
next_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[2]/div/div[2]/div[1]/button[2]'
while True:
# 引入“类人”随机延迟2.5 秒到 5.5 秒之间随机)
delay = random.uniform(2.5, 5.5)
log("INFO", f"⏳ 模拟真人停顿 {delay:.2f} 秒后,准备点击下一页...")
time.sleep(delay)
# 偶尔的“长休息”(模拟用户看累了或者喝口水),每抓 50 页额外休息 10-20 秒
if page_num > 1 and page_num % 50 == 0:
long_delay = random.uniform(10.0, 20.0)
log("INFO", f"☕️ 已经连续高强度翻了 {page_num} 页,触发风控规避机制,假装喝水休息 {long_delay:.2f} 秒...")
time.sleep(long_delay)
next_btn = page.ele(next_btn_xpath, timeout=5)
if not next_btn:
log("ERR", "找不到下一页按钮,翻页中止。")
break
# 检查按钮是否被禁用
class_str = str(next_btn.attr("class"))
aria_disabled = next_btn.attr("aria-disabled")
is_disabled_attr = next_btn.attr("disabled") is not None
if "disabled" in class_str or is_disabled_attr or aria_disabled == "true":
log("OK", "🏁 下一页按钮已被禁用,说明已经到达最后一页!")
break
page_num += 1
log("INFO", f"正在点击【下一页】抓取第 {page_num} 页...")
try:
next_btn.click()
except Exception as e:
log("ERR", f"普通点击失败: {e},尝试 JS 点击...")
page.run_js("arguments[0].click();", next_btn)
# 等待新一页的 API 响应
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", f"{page_num} 页请求超时或未触发,中止抓取。")
break
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
if isinstance(data, dict) and "result" in data:
items = data["result"].get("items", [])
if not items:
log("WARN", f"{page_num} 页返回了空列表,可能已无数据。")
break
for item in items:
all_clean_items.append({
"采购订单号": item.get("purchaseOrderCode"),
"行号": item.get("rowsNum"),
"物料代码": item.get("materialCode"),
"物料名称": item.get("materialName"),
"物料规格": item.get("materialSpecification"),
"仓库代码": item.get("warehouseCode"),
"仓库名称": item.get("warehouseName"),
"供应商代码": item.get("supplierCode"),
"供应商名称": item.get("supplierName"),
"单位名称": item.get("unitName"),
"转换单位": item.get("convertUnitName"),
"收货单价": item.get("receivePrice"),
"收货时间": item.get("receiptTime"),
"进货数量": item.get("convertPlannedPurchaseQuantity") if item.get("convertPlannedPurchaseQuantity") is not None else item.get("plannedPurchaseQuantity"),
"收货数量": item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity"),
"收货总金额": item.get("receiveAmount")
})
log("OK", f"{page_num} 页清洗完成,累计提取 {len(all_clean_items)} 条数据。")
# 每 10 页自动保存一次,防止意外崩溃导致数据丢失
if page_num % 10 == 0:
with open(SAVE_PATH, "w", encoding="utf-8") as f:
json.dump(all_clean_items, f, ensure_ascii=False, indent=2)
log("INFO", f"💾 自动存档: 已保存 {len(all_clean_items)} 条记录至本地。")
else:
log("ERR", f"{page_num} 页数据结构异常,中止。")
break
page.listen.stop()
# =========================================================
# 最终保存
# =========================================================
if all_clean_items:
with open(SAVE_PATH, "w", encoding="utf-8") as f:
json.dump(all_clean_items, f, ensure_ascii=False, indent=2)
log("OK", f"🎉 全部抓取完成!总计成功提取 {len(all_clean_items)} 条数据。")
log("OK", f"数据已保存至: {SAVE_PATH}")
except Exception as e:
log("ERR", f"发生全局异常: {e}")
# 异常时尝试抢救数据
if all_clean_items:
rescue_path = OUTPUT_DIR / "receipt_details_RESCUE.json"
with open(rescue_path, "w", encoding="utf-8") as f:
json.dump(all_clean_items, f, ensure_ascii=False, indent=2)
log("INFO", f"🆘 触发异常保存,抢救了 {len(all_clean_items)} 条数据。")
if __name__ == "__main__":
fetch_receipt_details_full()

View File

@@ -0,0 +1,252 @@
"""
收货明细报表 - 智能增量同步脚本
目标:
1. 自动连接本地 SQLite 数据库查询当前存量。
2. 进入 ERP 系统截获第一页 API提取系统总条数。
3. 精准计算需要跳转的起始页码,并在前端页面自动完成跳转。
4. 仅提取新增页面的数据,内存去重后插入 SQLite绝不重复抓取历史数据。
"""
import sys
import json
import time
import math
import random
import sqlite3
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from login import get_page, log
from config import DB_PATH
HOME_URL = "https://yunmes.tftykj.cn/"
API_TARGET = "ReceiptDetailsCheck_SearchList_Proxy"
def get_local_count(conn):
"""获取本地数据库已有的总记录数"""
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM receipt_details")
return cursor.fetchone()[0]
def item_exists(cursor, item):
"""判断某条明细是否已在数据库中存在(基于采购单号+行号+物料代码组合判断)"""
po_code = item.get("purchaseOrderCode")
row_no = item.get("rowsNum")
mat_code = item.get("materialCode")
cursor.execute('''
SELECT 1 FROM receipt_details
WHERE purchase_order_code = ? AND row_no = ? AND material_code = ?
''', (po_code, row_no, mat_code))
return cursor.fetchone() is not None
def fetch_receipt_details_incremental():
log("INFO", "=== 🚀 启动收货明细报表 - 智能增量同步 ===")
if not DB_PATH.exists():
log("ERR", f"找不到数据库文件: {DB_PATH},请先执行全量导入!")
return
conn = sqlite3.connect(DB_PATH)
local_count = get_local_count(conn)
log("INFO", f"📦 本地数据库当前总计: {local_count} 条数据")
page = get_page(port=9222)
try:
log("INFO", f"正在回到主页起点: {HOME_URL}")
page.get(HOME_URL)
page.wait.load_start()
time.sleep(2)
menus = [
("第一层: 业务统计报表", 'xpath://*[@id="app"]/div/div[1]/div[1]/div[2]/div/div[1]/div/div[10]/div/p'),
("第二层: 采购业务报表", 'xpath:/html/body/div[7]/div/div[1]/div/div[4]/div/p'),
("第三层: 收货明细报表", 'xpath:/html/body/div[8]/div/div[1]/div/div[4]/div/p')
]
log("INFO", "模拟点击左侧导航菜单...")
for name, xpath in menus:
ele = page.ele(xpath, timeout=5)
if ele:
try: ele.click()
except: page.run_js("arguments[0].click();", ele)
time.sleep(1.5)
else:
log("ERR", f"找不到菜单元素: {name}")
return
log("OK", "✅ 成功点开收货明细报表界面!")
# 隐藏菜单
blank_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[1]/div[2]/div[2]/div/div[1]/div'
blank_ele = page.ele(blank_xpath, timeout=3)
if blank_ele:
try: blank_ele.click()
except: page.run_js("arguments[0].click();", blank_ele)
time.sleep(0.5)
log("INFO", f"开启底层数据拦截网: {API_TARGET}")
page.listen.start(API_TARGET)
packet = page.listen.wait(timeout=10)
if not packet:
query_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[1]/div/button[1]/span'
query_btn = page.ele(query_btn_xpath, timeout=3)
if query_btn:
try: query_btn.click()
except: page.run_js("arguments[0].click();", query_btn)
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", "未能拦截到第一页数据,无法获取线上总条数。")
return
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
remote_count = 0
if isinstance(data, dict) and "result" in data:
remote_count = data["result"].get("totalCount", 0)
log("INFO", f"🌐 线上 ERP 系统当前总条数: {remote_count}")
if remote_count <= local_count:
log("OK", "🎉 本地数据已是最新状态,无需抓取!")
return
new_items_count = remote_count - local_count
log("INFO", f"🔥 发现新增数据: {new_items_count} 条!准备进行增量跳页抓取...")
# 每页 50 条,计算应该从哪一页开始抓
# 例如: 本地有 37584 条37584 // 50 = 751 页是满的,所以从第 752 页开始抓
start_page = math.floor(local_count / 50) + 1
end_page = math.ceil(remote_count / 50)
log("INFO", f"🎯 智能跳页计算完毕:直接跳转至第 {start_page} 页 (目标到 {end_page} 页)")
# 执行跳转
if start_page > 1:
jumper_input_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[2]/div/div[2]/div[1]/span[3]/div/div//input'
input_ele = page.ele(jumper_input_xpath, timeout=5)
if not input_ele:
jumper_input_xpath = 'xpath://input[@type="number" and @aria-label=""]'
input_ele = page.ele(jumper_input_xpath, timeout=5)
if input_ele:
input_ele.clear()
input_ele.input(str(start_page))
time.sleep(0.5)
input_ele.input('\n')
# 等待跳转后的数据响应
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", "跳转失败,未拦截到目标页的数据请求。")
return
log("OK", f"✅ 成功跳转至第 {start_page} 页并截获数据!")
else:
log("ERR", "找不到页码输入框,增量跳转失败!")
return
# =========================================================
# 开始处理新增页面的数据并入库
# =========================================================
current_page = start_page
cursor = conn.cursor()
total_inserted = 0
while current_page <= end_page:
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
inserted_this_page = 0
if isinstance(data, dict) and "result" in data:
items = data["result"].get("items", [])
for item in items:
po_code = item.get("purchaseOrderCode")
row_no = item.get("rowsNum")
mat_code = item.get("materialCode")
# 检查是否存在,如果存在则更新数量和金额,不存在则插入
cursor.execute('SELECT id FROM receipt_details WHERE purchase_order_code = ? AND row_no = ? AND material_code = ?', (po_code, row_no, mat_code))
existing_record = cursor.fetchone()
p_qty = item.get("convertPlannedPurchaseQuantity") if item.get("convertPlannedPurchaseQuantity") is not None else item.get("plannedPurchaseQuantity")
r_qty = item.get("convertGoodsQuantity") if item.get("convertGoodsQuantity") is not None else item.get("goodsQuantity")
if existing_record:
cursor.execute('''
UPDATE receipt_details
SET purchase_qty = ?, receive_qty = ?, receive_price = ?, total_amount = ?
WHERE id = ?
''', (p_qty, r_qty, item.get("receivePrice"), item.get("receiveAmount"), existing_record[0]))
# 算作更新,为了记录日志
inserted_this_page += 1
else:
cursor.execute('''
INSERT INTO receipt_details (
purchase_order_code, row_no, material_code, material_name,
material_specification, warehouse_code, warehouse_name,
supplier_code, supplier_name, unit_name, conversion_unit,
receive_price, receipt_time,
purchase_qty, receive_qty, total_amount
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
po_code,
row_no,
mat_code,
item.get("materialName"),
item.get("materialSpecification"),
item.get("warehouseCode"),
item.get("warehouseName"),
item.get("supplierCode"),
item.get("supplierName"),
item.get("unitName"),
item.get("convertUnitName"),
item.get("receivePrice"),
item.get("receiptTime"),
p_qty,
r_qty,
item.get("receiveAmount")
))
inserted_this_page += 1
total_inserted += 1
conn.commit()
log("OK", f"{current_page} 页处理完毕,成功入库 {inserted_this_page} 条新数据。")
# 还有下一页则继续点击
if current_page < end_page:
delay = random.uniform(1.5, 3.5)
log("INFO", f"⏳ 停顿 {delay:.2f} 秒后点击下一页...")
time.sleep(delay)
next_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[2]/div/div[2]/div[1]/button[2]'
next_btn = page.ele(next_btn_xpath, timeout=5)
if next_btn:
try: next_btn.click()
except: page.run_js("arguments[0].click();", next_btn)
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", f"{current_page + 1} 页请求超时!")
break
else:
log("ERR", "找不到下一页按钮!")
break
current_page += 1
log("OK", f"🎉 增量同步大功告成!总计入库 {total_inserted} 条全新数据!")
except Exception as e:
log("ERR", f"发生全局异常: {e}")
finally:
conn.close()
page.listen.stop()
if __name__ == "__main__":
fetch_receipt_details_incremental()

View File

@@ -0,0 +1,227 @@
import sqlite3
import json
from pathlib import Path
import os
from config import OUTPUT_DIR, DB_PATH
RECEIPT_JSON = OUTPUT_DIR / "receipt_details_full_clean.json"
BOM_JSON = OUTPUT_DIR / "bom_cost_full_tree_final.json"
def init_db():
"""初始化数据库并创建表"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建收货明细表
cursor.execute('''
CREATE TABLE IF NOT EXISTS receipt_details (
id INTEGER PRIMARY KEY AUTOINCREMENT,
purchase_order_code TEXT,
row_no INTEGER,
material_code TEXT,
material_name TEXT,
material_specification TEXT,
warehouse_code TEXT,
warehouse_name TEXT,
supplier_code TEXT,
supplier_name TEXT,
unit_name TEXT,
conversion_unit TEXT,
receive_price REAL,
receipt_time TEXT,
purchase_qty REAL,
receive_qty REAL,
total_amount REAL
)
''')
# 为收货明细表创建索引以加速查询
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_material_code ON receipt_details(material_code)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_supplier_name ON receipt_details(supplier_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_time ON receipt_details(receipt_time)')
# 创建 BOM 成本表(父件表)
cursor.execute('DROP TABLE IF EXISTS bom_child')
cursor.execute('DROP TABLE IF EXISTS bom_parent')
cursor.execute('''
CREATE TABLE bom_parent (
id INTEGER PRIMARY KEY AUTOINCREMENT,
parent_material_code TEXT UNIQUE,
parent_material_name TEXT
)
''')
# 创建 BOM 成本表(子件明细表)
# 由于是树状结构,我们采用“邻接表”模型,记录每个节点的 parent_id
cursor.execute('''
CREATE TABLE bom_child (
id INTEGER PRIMARY KEY AUTOINCREMENT,
parent_material_code TEXT, -- 归属的最顶层父件
node_material_code TEXT,
node_material_name TEXT,
bom_level INTEGER,
parent_node_id INTEGER, -- 指向上一级子件的 id如果是一级子件则为空
usage_qty REAL DEFAULT 1.0,
FOREIGN KEY(parent_material_code) REFERENCES bom_parent(parent_material_code),
FOREIGN KEY(parent_node_id) REFERENCES bom_child(id)
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bom_child_parent_code ON bom_child(parent_material_code)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bom_child_node_code ON bom_child(node_material_code)')
conn.commit()
return conn
def import_receipt_details(conn):
"""导入收货明细数据"""
if not RECEIPT_JSON.exists():
print(f"找不到收货明细文件: {RECEIPT_JSON}")
return
print("开始导入收货明细数据...")
with open(RECEIPT_JSON, 'r', encoding='utf-8') as f:
data = json.load(f)
cursor = conn.cursor()
# 清空旧数据(如果需要重复运行),并且我们现在要更新表结构
cursor.execute('DROP TABLE IF EXISTS receipt_details')
cursor.execute('''
CREATE TABLE receipt_details (
id INTEGER PRIMARY KEY AUTOINCREMENT,
purchase_order_code TEXT,
row_no INTEGER,
material_code TEXT,
material_name TEXT,
material_specification TEXT,
warehouse_code TEXT,
warehouse_name TEXT,
supplier_code TEXT,
supplier_name TEXT,
unit_name TEXT,
conversion_unit TEXT,
receive_price REAL,
receipt_time TEXT,
purchase_qty REAL,
receive_qty REAL,
total_amount REAL
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_material_code ON receipt_details(material_code)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_supplier_name ON receipt_details(supplier_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_time ON receipt_details(receipt_time)')
count = 0
for item in data:
p_qty = item.get("进货数量")
r_qty = item.get("收货数量")
cursor.execute('''
INSERT INTO receipt_details (
purchase_order_code, row_no, material_code, material_name,
material_specification, warehouse_code, warehouse_name,
supplier_code, supplier_name, unit_name, conversion_unit,
receive_price, receipt_time,
purchase_qty, receive_qty, total_amount
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
item.get("采购订单号"),
item.get("行号"),
item.get("物料代码"),
item.get("物料名称"),
item.get("物料规格"),
item.get("仓库代码"),
item.get("仓库名称"),
item.get("供应商代码"),
item.get("供应商名称"),
item.get("单位名称"),
item.get("转换单位"),
item.get("收货单价"),
item.get("收货时间"),
p_qty,
r_qty,
item.get("收货总金额")
))
count += 1
conn.commit()
print(f"成功导入 {count} 条收货明细数据!")
def _insert_bom_tree(cursor, parent_material_code, tree_nodes, parent_node_id=None):
"""递归插入 BOM 树节点"""
for node in tree_nodes:
# 提取当前节点信息
node_code = node.get("childMaterialCode")
node_name = node.get("childMaterialName")
bom_level = node.get("bomLevel")
usage_qty = float(node.get("usageQty") or 1.0)
# 插入当前节点
cursor.execute('''
INSERT INTO bom_child (
parent_material_code, node_material_code, node_material_name, bom_level, parent_node_id, usage_qty
) VALUES (?, ?, ?, ?, ?, ?)
''', (parent_material_code, node_code, node_name, bom_level, parent_node_id, usage_qty))
# 获取刚插入的节点 ID作为其子节点的 parent_node_id
current_node_id = cursor.lastrowid
# 如果有子节点,递归插入
sub_items = node.get("sub_items", [])
if sub_items:
_insert_bom_tree(cursor, parent_material_code, sub_items, current_node_id)
def import_bom_data(conn):
"""导入 BOM 成本树状数据"""
if not BOM_JSON.exists():
print(f"找不到 BOM 成本文件: {BOM_JSON}")
return
print("开始导入 BOM 成本数据...")
with open(BOM_JSON, 'r', encoding='utf-8') as f:
data = json.load(f)
cursor = conn.cursor()
# 清空旧数据
cursor.execute('DELETE FROM bom_child')
cursor.execute('DELETE FROM bom_parent')
parent_count = 0
for parent in data:
parent_code = parent.get("parentMaterialCode")
parent_name = parent.get("parentMaterialName")
# 忽略空父件
if not parent_code:
continue
try:
cursor.execute('''
INSERT INTO bom_parent (parent_material_code, parent_material_name)
VALUES (?, ?)
''', (parent_code, parent_name))
parent_count += 1
# 递归处理这棵树
tree = parent.get("bom_cost_tree", [])
if tree:
_insert_bom_tree(cursor, parent_code, tree, parent_node_id=None)
except sqlite3.IntegrityError:
print(f"警告: 父件重复 {parent_code},跳过")
conn.commit()
# 统计插入的子件数量
cursor.execute('SELECT COUNT(*) FROM bom_child')
child_count = cursor.fetchone()[0]
print(f"成功导入 {parent_count} 个 BOM 父件,包含 {child_count} 个子件节点!")
if __name__ == "__main__":
print(f"数据库文件将保存在: {DB_PATH}")
conn = init_db()
import_receipt_details(conn)
import_bom_data(conn)
conn.close()
print("全部导入完成!你可以使用 SQLite 客户端连接 erp_data.db 查看数据。")

View File

@@ -0,0 +1,51 @@
"""
ERP 浏览器保活服务 (手工填写账号密码专用)
运行此脚本后,在弹出的 Chrome 浏览器中手工登录。
登录成功后,不要关闭终端和浏览器。
其他抓取脚本(指定同一端口)就可以直接复用这个已经登录的浏览器实例了!
"""
import sys
import time
from pathlib import Path
# 引入现有的登录模块
sys.path.insert(0, str(Path(__file__).parent))
from login import get_page, login_manual, log
def main():
log("INFO", "=== 🚀 启动浏览器保活服务 (端口: 9222) ===")
# 统一使用 9222 端口,后续所有抓取脚本也连这个端口
page = get_page(port=9222)
try:
# 调用已有的手工登录逻辑:打开网页,等待用户手工操作
log("INFO", "等待您在弹出的浏览器中完成手工登录...")
# 这里的 login_manual 已经有轮询检测是否登录成功的逻辑了
ok = login_manual(page)
if ok:
log("OK", "✅ 登录成功!浏览器已进入保活状态。")
log("INFO", "==================================================")
log("INFO", "⚠️ 请勿关闭此终端窗口和弹出的 Chrome 浏览器!")
log("INFO", "👉 现在您可以新开一个终端,去运行其他的抓取脚本了。")
log("INFO", "🛑 如果要结束保活关闭浏览器,请在此终端按 Ctrl+C。")
log("INFO", "==================================================")
# 死循环保活,直到用户手动在终端按 Ctrl+C 退出
while True:
time.sleep(10)
else:
log("ERR", "❌ 登录超时或失败,保活服务即将退出。")
page.quit()
except KeyboardInterrupt:
log("INFO", "接收到退出信号 (Ctrl+C),正在关闭浏览器...")
page.quit()
except Exception as e:
log("ERR", f"发生异常: {e}")
page.quit()
if __name__ == "__main__":
main()