Files
datie-bom/browser_login/fetch_bom_cost_full_tree.py

251 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
BOM 成本 - 终极树状结构抓取脚本 (全站 1400+ 父件及 5 层嵌套子件)
目标:
1. 抓取所有父件(成本核算表主页)
2. 暗网请求所有父件下对应的 BOM 成本数据(扁平的 5 层数据)
3. 实时清洗并重组为完美嵌套的 JSON 树
"""
import sys
import json
import time
import subprocess
import random
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from login import get_page, log
from config import OUTPUT_DIR
PAGE_URL = "https://yunmes.tftykj.cn/PartBomCostAccounting"
API_PARENT = "PartBomCostAccounting_SearchList_Proxy"
# 最终保存的文件名
TREE_FILE_PATH = OUTPUT_DIR / "bom_cost_full_tree_final.json"
def build_nested_tree(flat_items):
"""将包含 _parentId 的扁平列表转换为嵌套树"""
if not flat_items:
return []
node_dict = {}
for item in flat_items:
son_no = item.get("sonNO")
if not son_no:
continue
clean_node = {
"sonNO": son_no,
"_parentId": item.get("_parentId"),
"bomLevel": item.get("bomLevel"),
"childMaterialCode": item.get("childMaterialCode"),
"childMaterialName": item.get("childMaterialName"),
"usageQty": item.get("childrenMaterialConsumption", 1),
"sub_items": []
}
node_dict[son_no] = clean_node
roots = []
for son_no, node in node_dict.items():
parent_id = node.get("_parentId")
if parent_id is None:
roots.append(node)
else:
parent_node = node_dict.get(parent_id)
if parent_node:
parent_node["sub_items"].append(node)
# 清理建树临时字段
def clean_temp(node_list):
for node in node_list:
node.pop("sonNO", None)
node.pop("_parentId", None)
if node["sub_items"]:
clean_temp(node["sub_items"])
else:
node.pop("sub_items", None)
clean_temp(roots)
return roots
def fetch_bom_cost_tree():
log("INFO", "=== 🌳 启动 BOM 成本终极抓取 (多层嵌套自动重组) ===")
page = get_page(port=9222)
clean_parents_list = []
try:
# =========================================================
# 第一阶段:获取父件基础信息
# =========================================================
log("INFO", f"正在访问安全的父件页面: {PAGE_URL}")
page.get(PAGE_URL)
page.wait.load_start()
log("INFO", f"开启父件 API 网络监听: {API_PARENT}")
page.listen.start(API_PARENT)
page.refresh()
current_page = 1
total_records = 0
while True:
log("INFO", f"等待第 {current_page} 页父件 API 响应...")
packet = page.listen.wait(timeout=20)
if not packet:
log("ERR", f"超时未收到第 {current_page} 页数据,父件扫荡结束。")
break
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
if isinstance(data, dict) and "result" in data:
result_data = data.get("result") or {}
items = result_data.get("items", [])
total_records = result_data.get("totalCount", 0)
for item in items:
if not item:
continue
# 注意:我们要拿的是 parentMaterialId因为这是传给 BOM 成本 API 的关键参数 materialId
clean_parent = {
"_id": item.get("id"), # 这个是 partBomCostAccountingId
"_materialId": item.get("parentMaterialId"), # 这个是传给子件的 materialId
"parentMaterialCode": item.get("parentMaterialCode"),
"parentMaterialName": item.get("parentMaterialName"),
"bom_cost_tree": [] # 准备挂载这棵树
}
clean_parents_list.append(clean_parent)
log("OK", f"提取了 {len(items)} 个父件。总进度: {len(clean_parents_list)}/{total_records}")
if len(clean_parents_list) >= total_records or len(items) == 0:
break
else:
break
# 准备翻页
next_btn_xpath = "xpath:/html/body/div[1]/div/div[3]/table/tbody/tr/td[10]/a/span/span[2]"
next_btn = page.ele(next_btn_xpath, timeout=5)
if next_btn:
parent_a = next_btn.parent(2)
if parent_a and "disabled" in parent_a.attr("class"):
log("INFO", "已到达最后一页。")
break
page.run_js("arguments[0].click();", next_btn)
time.sleep(1.5)
else:
log("WARN", "未找到下一页按钮,停止翻页。")
break
current_page += 1
page.listen.stop()
# =========================================================
# 第二阶段:暗网递归注入,重组 5 层嵌套树
# =========================================================
log("INFO", f"=== 🚀 开始为 {len(clean_parents_list)} 个父件抓取 BOM 成本树 ===")
js_template = """
return new Promise((resolve, reject) => {
if (typeof $ !== 'undefined' && $.ajax) {
$.ajax({
url: '/api/services/TfTechApi/PartBom/PartBom_SearchByTreeCost',
type: 'POST',
data: {
materialId: MATERIAL_ID_PLACEHOLDER,
partBomCostAccountingId: ACCOUNTING_ID_PLACEHOLDER,
childMaterialCode: '',
childMaterialName: '',
childMaterialSpecification: '',
childMaterialModel: ''
},
headers: {
'referer': 'https://yunmes.tftykj.cn/PartBomCostAccounting/Detail?id=ACCOUNTING_ID_PLACEHOLDER'
},
success: function(response) {
resolve({status: 'success', data: response});
},
error: function(xhr, status, error) {
resolve({status: 'error', data: xhr.responseText || error});
}
});
} else {
resolve({status: 'error', data: 'No jQuery'});
}
});
"""
for index, parent in enumerate(clean_parents_list):
accounting_id = parent.get("_id")
material_id = parent.get("_materialId")
parent_code = parent.get("parentMaterialCode", "未知")
if not accounting_id or not material_id:
continue
log("INFO", f"[{index+1}/{len(clean_parents_list)}] 正在请求 BOM 成本树 (Code: {parent_code})...")
js_code = js_template.replace("MATERIAL_ID_PLACEHOLDER", str(material_id)).replace("ACCOUNTING_ID_PLACEHOLDER", str(accounting_id))
result = page.run_js(js_code)
if result and result.get('status') == 'success':
data = result.get('data')
if isinstance(data, str):
try: data = json.loads(data)
except: pass
if isinstance(data, dict) and "result" in data:
flat_items = data["result"]
if isinstance(flat_items, list):
# 核心:调用刚才验证成功的重组函数,把扁平列表变成 5 层树
nested_tree = build_nested_tree(flat_items)
parent["bom_cost_tree"] = nested_tree
log("OK", f" └── 成功重组了一棵包含 {len(flat_items)} 个节点的多层树。")
else:
log("ERR", f" └── 请求失败: {result.get('data') if result else '未知错误'}")
time.sleep(random.uniform(0.3, 0.7))
if (index + 1) % 10 == 0 or (index + 1) == len(clean_parents_list):
# 最终保存前,清理一下用于请求的临时字段
clean_save_list = []
for p in clean_parents_list[:index+1]:
clean_p = dict(p)
clean_p.pop("_id", None)
clean_p.pop("_materialId", None)
clean_save_list.append(clean_p)
with open(TREE_FILE_PATH, "w", encoding="utf-8") as f:
json.dump(clean_save_list, f, ensure_ascii=False, indent=2)
log("INFO", f"💾 进度已实时保存至 JSON ({index+1}/{len(clean_parents_list)})")
log("OK", f"=== 🏆 终极 BOM 成本多层树状抓取完成!文件路径: {TREE_FILE_PATH} ===")
# 抓取完成后,自动调用入库脚本将 JSON 导入 SQLite
log("INFO", "⏳ 正在自动将 JSON 数据同步至 SQLite 数据库...")
try:
import_script = Path(__file__).parent / "import_to_sqlite.py"
# 使用 sys.executable 确保使用当前的 Python 环境
import sys
result = subprocess.run([sys.executable, str(import_script), "--bom-only"], capture_output=True, text=True)
if result.returncode == 0:
log("OK", "✅ 数据库同步成功!")
print(result.stdout)
else:
log("ERR", f"❌ 数据库同步失败: {result.stderr}")
except Exception as db_err:
log("ERR", f"❌ 自动触发入库脚本失败: {db_err}")
except Exception as e:
import traceback
err_msg = f"发生异常: {e}\n{traceback.format_exc()}"
with open("error.log", "w") as f:
f.write(err_msg)
print(err_msg, flush=True)
log("ERR", err_msg)
if __name__ == "__main__":
fetch_bom_cost_tree()