Files
datie-bom/web_ui/app.py
2026-06-12 16:30:57 +08:00

1284 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, render_template, jsonify, request
import sqlite3
import subprocess
from pathlib import Path
import sys
import os
import io
import contextlib
import logging
import threading
import time
import webbrowser
from apscheduler.schedulers.background import BackgroundScheduler
def get_base_dir():
"""获取程序运行时的基础目录(静态资源、脚本代码等只读文件存放处)"""
if getattr(sys, 'frozen', False):
return Path(sys._MEIPASS)
return Path(__file__).parent.parent
def get_data_dir():
"""获取持久化数据存放目录(数据库、输出文件等,保证重启不丢失)"""
if getattr(sys, 'frozen', False):
return Path(os.path.dirname(sys.executable))
return Path(__file__).parent.parent
BASE_DIR = get_base_dir()
DATA_DIR = get_data_dir()
# Web 资源路径 (在 PyInstaller _MEIPASS 目录中)
TEMPLATE_DIR = BASE_DIR / "web_ui" / "templates"
STATIC_DIR = BASE_DIR / "web_ui" / "static"
app = Flask(__name__, template_folder=str(TEMPLATE_DIR), static_folder=str(STATIC_DIR))
# 数据库路径:指向外部持久化数据目录,确保数据不丢失
DB_PATH = DATA_DIR / "browser_login" / "output" / "erp_data.db"
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
# 抓取脚本目录 (属于代码文件,从 BASE_DIR 加载)
BROWSER_LOGIN_DIR = BASE_DIR / "browser_login"
# 全局互斥锁机制:保证同一时间只有一个任务在控制浏览器
browser_lock = threading.Lock()
is_browser_busy = False
current_task_name = ""
task_logs = []
class WebLogger:
def __init__(self, orig):
self.orig = orig
def write(self, text):
self.orig.write(text)
msg = text.strip()
if msg and is_browser_busy:
task_logs.append(msg)
if len(task_logs) > 500:
task_logs.pop(0)
def flush(self):
self.orig.flush()
sys.stdout = WebLogger(sys.stdout)
sys.stderr = WebLogger(sys.stderr)
def set_browser_busy(task_name):
"""设置浏览器为忙碌状态"""
global is_browser_busy, current_task_name, task_logs
with browser_lock:
is_browser_busy = True
current_task_name = task_name
task_logs.clear()
def release_browser():
"""释放浏览器控制权"""
global is_browser_busy, current_task_name
with browser_lock:
is_browser_busy = False
current_task_name = ""
def auto_init_db():
"""如果是新环境首次运行,自动初始化数据库表结构"""
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
try:
from import_to_sqlite import init_db
conn = init_db()
conn.close()
except Exception as e:
print(f"⚠️ 初始化数据库表结构失败: {e}")
# 在应用启动前执行初始化
auto_init_db()
def background_sync_job():
"""APScheduler 后台定时任务执行增量抓取"""
global is_browser_busy
print("[定时任务] 正在执行后台增量数据同步...")
# 检查浏览器是否被其他任务占用
if is_browser_busy:
print(f"[定时任务] ⚠️ 浏览器当前正被 '{current_task_name}' 占用,本次增量同步跳过。")
return
set_browser_busy("定时后台增量同步")
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
try:
from fetch_receipt_details_incremental import fetch_receipt_details_incremental
# 重定向输出,避免污染终端,同时捕获可能的信息
f = io.StringIO()
with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f):
fetch_receipt_details_incremental()
print(f"[定时任务] 同步完成。")
except Exception as e:
print(f"[定时任务] 同步失败: {str(e)}")
finally:
release_browser()
# 初始化定时调度器
scheduler = BackgroundScheduler()
scheduler.add_job(func=background_sync_job, trigger="interval", minutes=30)
# 启动调度器
scheduler.start()
# 确保在应用退出时关闭调度器
import atexit
atexit.register(lambda: scheduler.shutdown())
def get_db_connection():
"""获取数据库连接"""
conn = sqlite3.connect(DB_PATH)
# 将查询结果转换为字典形式
conn.row_factory = sqlite3.Row
return conn
@app.route('/')
def index():
"""渲染导航主页"""
return render_template('home.html')
@app.route('/receipts')
def receipts_page():
"""渲染收货明细数据看板"""
return render_template('index.html')
@app.route('/work_orders')
def work_orders_page():
"""渲染生产工单数据看板"""
return render_template('work_orders.html')
@app.route('/abnormal_report')
def abnormal_report_page():
"""渲染发料异常检查数据看板"""
return render_template('abnormal_report.html')
@app.route('/reconciliation')
def reconciliation_page():
"""渲染绩效核查与BOM比对看板"""
return render_template('reconciliation.html')
@app.route('/api/receipts')
def get_receipts():
"""获取收货明细数据(支持分页和多条件搜索)"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
offset = (page - 1) * limit
# 获取搜索参数
supplier_name = request.args.get('supplier_name', '').strip()
material_name = request.args.get('material_name', '').strip()
po_code = request.args.get('po_code', '').strip()
material_code = request.args.get('material_code', '').strip()
conn = get_db_connection()
# 构建动态 SQL 查询
query_conditions = []
params = []
if supplier_name:
query_conditions.append("supplier_name LIKE ?")
params.append(f"%{supplier_name}%")
if material_name:
query_conditions.append("material_name LIKE ?")
params.append(f"%{material_name}%")
if po_code:
query_conditions.append("purchase_order_code LIKE ?")
params.append(f"%{po_code}%")
if material_code:
query_conditions.append("material_code LIKE ?")
params.append(f"%{material_code}%")
where_clause = ""
if query_conditions:
where_clause = " WHERE " + " AND ".join(query_conditions)
# 获取总数
count_query = f"SELECT COUNT(*) FROM receipt_details{where_clause}"
total = conn.execute(count_query, params).fetchone()[0]
# 获取分页数据
data_query = f"SELECT * FROM receipt_details{where_clause} ORDER BY receipt_time DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
receipts = conn.execute(data_query, params).fetchall()
conn.close()
return jsonify({
"total": total,
"rows": [dict(ix) for ix in receipts]
})
@app.route('/api/work_orders')
def get_work_orders():
"""获取生产工单数据(支持分页和多条件搜索)"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
offset = (page - 1) * limit
# 获取搜索参数
wo_number = request.args.get('work_orders_number', '').strip()
material_name = request.args.get('material_name', '').strip()
material_code = request.args.get('material_code', '').strip()
conn = get_db_connection()
# 构建动态 SQL 查询
query_conditions = []
params = []
if wo_number:
query_conditions.append("work_orders_number LIKE ?")
params.append(f"%{wo_number}%")
if material_name:
query_conditions.append("material_name LIKE ?")
params.append(f"%{material_name}%")
if material_code:
query_conditions.append("material_code LIKE ?")
params.append(f"%{material_code}%")
where_clause = ""
if query_conditions:
where_clause = " WHERE " + " AND ".join(query_conditions)
# 获取总数
count_query = f"SELECT COUNT(*) FROM work_orders{where_clause}"
total = conn.execute(count_query, params).fetchone()[0]
# 获取分页数据
data_query = f"SELECT * FROM work_orders{where_clause} ORDER BY execution_time DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
orders = conn.execute(data_query, params).fetchall()
conn.close()
return jsonify({
"total": total,
"rows": [dict(ix) for ix in orders]
})
@app.route('/api/abnormal_report')
def get_abnormal_report():
"""获取发料异常报表数据(支持分页和多条件搜索)"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
offset = (page - 1) * limit
# 获取搜索参数
wo_number = request.args.get('work_orders_number', '').strip()
material_code = request.args.get('material_code', '').strip()
issue_status = request.args.get('issue_status', '').strip()
conn = get_db_connection()
# 构建动态 SQL 查询
query_conditions = []
params = []
if wo_number:
query_conditions.append("work_orders_number LIKE ?")
params.append(f"%{wo_number}%")
if material_code:
query_conditions.append("material_code LIKE ?")
params.append(f"%{material_code}%")
if issue_status:
query_conditions.append("issue_status = ?")
params.append(issue_status)
where_clause = ""
if query_conditions:
where_clause = " WHERE " + " AND ".join(query_conditions)
# 获取总数
count_query = f"SELECT COUNT(*) FROM abnormal_report{where_clause}"
total = conn.execute(count_query, params).fetchone()[0]
# 获取分页数据 (默认按工单号倒序排列)
data_query = f"SELECT * FROM abnormal_report{where_clause} ORDER BY work_orders_number DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
orders = conn.execute(data_query, params).fetchall()
conn.close()
return jsonify({
"total": total,
"rows": [dict(ix) for ix in orders]
})
@app.route('/api/analysis/match_work_orders', methods=['POST'])
def match_work_orders():
"""触发:第一步提取并匹配工单"""
global is_browser_busy
if is_browser_busy:
return jsonify({
"success": False,
"message": f"系统忙碌:当前正在执行 '{current_task_name}',请稍后再试。"
}), 409
def run_match_task():
set_browser_busy("自动清洗并匹配工单数据")
try:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
from analysis_service import MaterialReconciliationService
service = MaterialReconciliationService()
updated = service.step1_extract_and_match_work_orders()
print(f"✅ 工单匹配任务执行成功!共处理了 {updated} 条发料记录。")
except Exception as e:
print(f"❌ 自动清洗匹配失败: {e}")
finally:
release_browser()
try:
threading.Thread(target=run_match_task, daemon=True).start()
return jsonify({"success": True, "message": "已触发后台自动清洗匹配工单任务!"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/analysis/unmatched_materials')
def get_unmatched_materials():
"""获取:第二步没有匹配到工单的物料明细"""
try:
start_date = request.args.get('start')
end_date = request.args.get('end')
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
from analysis_service import MaterialReconciliationService
service = MaterialReconciliationService()
data = service.step2_get_unmatched_materials(start_date, end_date)
return jsonify({"total": len(data), "rows": data})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/analysis/bom_reconciliation')
def get_bom_reconciliation():
"""获取第三步比对发料与BOM差异"""
try:
start_date = request.args.get('start')
end_date = request.args.get('end')
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
from analysis_service import MaterialReconciliationService
service = MaterialReconciliationService()
data = service.step3_bom_reconciliation(start_date, end_date)
# 支持前端筛选
status_filter = request.args.get('status', '')
if status_filter:
data = [r for r in data if r['status'] == status_filter]
return jsonify({"total": len(data), "rows": data})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/analysis/summary')
def get_analysis_summary():
"""获取当前对账数据的周期等汇总信息"""
try:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
from analysis_service import MaterialReconciliationService
service = MaterialReconciliationService()
period = service.get_data_period()
return jsonify(period)
except Exception as e:
return jsonify({"start": "-", "end": "-"}), 500
@app.route('/api/task_status')
def get_task_status():
"""获取当前浏览器控制任务的状态"""
return jsonify({
"is_busy": is_browser_busy,
"task_name": current_task_name
})
@app.route('/api/task_logs')
def get_task_logs():
"""获取实时日志"""
return jsonify({"logs": task_logs})
@app.route('/api/sync_receipts', methods=['POST'])
def sync_receipts():
"""触发后台运行收货明细增量抓取脚本(修复跨机器路径问题)"""
global is_browser_busy
import sys
if is_browser_busy:
return jsonify({
"success": False,
"message": f"系统忙碌:当前正在执行 '{current_task_name}',请稍后再试。"
}), 409
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
def run_receipt_sync():
set_browser_busy("手动收货明细增量同步")
try:
from fetch_receipt_details_incremental import fetch_receipt_details_incremental
fetch_receipt_details_incremental()
except Exception as e:
print(f"手动增量同步失败: {e}")
finally:
release_browser()
try:
# 将长时间运行的抓取任务放入后台线程,防止前端请求超时
threading.Thread(target=run_receipt_sync, daemon=True).start()
return jsonify({
"success": True,
"message": "增量同步任务已在后台启动!请观察黑框控制台的运行日志。",
"logs": "任务已在后台运行..."
})
except ImportError:
return jsonify({"success": False, "message": "找不到增量抓取脚本或导入失败"}), 404
except Exception as e:
return jsonify({"success": False, "message": f"系统错误: {str(e)}"}), 500
@app.route('/api/sync_issue_receipts', methods=['POST'])
def sync_issue_receipts():
"""触发后台运行发料单明细增量抓取脚本"""
global is_browser_busy
import sys
if is_browser_busy:
return jsonify({
"success": False,
"message": f"系统忙碌:当前正在执行 '{current_task_name}',请稍后再试。"
}), 409
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
def run_issue_receipt_sync():
set_browser_busy("手动发料单明细增量同步")
try:
from fetch_issue_receipt_incremental import fetch_issue_receipt_incremental
fetch_issue_receipt_incremental()
except Exception as e:
print(f"发料单明细增量同步失败: {e}")
finally:
release_browser()
try:
threading.Thread(target=run_issue_receipt_sync, daemon=True).start()
return jsonify({
"success": True,
"message": "发料单明细增量抓取任务已在后台启动!请观察运行日志。",
"logs": "任务已在后台运行..."
})
except ImportError:
return jsonify({"success": False, "message": "找不到发料单增量抓取脚本或导入失败"}), 404
except Exception as e:
return jsonify({"success": False, "message": f"系统错误: {str(e)}"}), 500
@app.route('/api/sync_work_orders', methods=['POST'])
def sync_work_orders():
"""触发后台运行生产工单增量抓取脚本"""
global is_browser_busy
import sys
if is_browser_busy:
return jsonify({
"success": False,
"message": f"系统忙碌:当前正在执行 '{current_task_name}',请稍后再试。"
}), 409
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
def run_work_orders_sync():
set_browser_busy("手动生产工单增量同步")
try:
from fetch_work_orders_incremental import fetch_work_orders_incremental
fetch_work_orders_incremental()
except Exception as e:
print(f"手动生产工单同步失败: {e}")
finally:
release_browser()
try:
threading.Thread(target=run_work_orders_sync, daemon=True).start()
return jsonify({
"success": True,
"message": "工单增量同步任务已在后台启动!请观察黑框控制台的运行日志。",
"logs": "任务已在后台运行..."
})
except ImportError:
return jsonify({"success": False, "message": "找不到增量抓取脚本或导入失败"}), 404
except Exception as e:
return jsonify({"success": False, "message": f"系统错误: {str(e)}"}), 500
@app.route('/api/sync_abnormal_report', methods=['POST'])
def sync_abnormal_report():
"""触发后台运行异常报表抓取脚本"""
global is_browser_busy
import sys
if is_browser_busy:
return jsonify({
"success": False,
"message": f"系统忙碌:当前正在执行 '{current_task_name}',请稍后再试。"
}), 409
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
def run_abnormal_report_sync():
set_browser_busy("手动发料异常报表抓取")
try:
import auto_fetch_abnormal_report
page = auto_fetch_abnormal_report.get_page(port=9222)
# 自动重试机制
while True:
try:
success = auto_fetch_abnormal_report.navigate_to_report(page)
if success:
result = auto_fetch_abnormal_report.fetch_report_data(page)
# 如果返回 False说明被踢回首页或发生中断需要重试
if result is False:
print("♻️ 检测到页面被系统踢回首页或中断,正在为您自动重新导航并恢复抓取...")
import time
time.sleep(3)
continue
else:
break
else:
print("❌ 导航进入报表失败,尝试重新导航...")
import time
time.sleep(3)
continue
except Exception as inner_e:
print(f"⚠️ 抓取循环中发生异常: {inner_e},尝试重新恢复...")
import time
time.sleep(3)
continue
except Exception as e:
print(f"❌ 手动发料异常报表抓取严重失败: {e}")
finally:
release_browser()
try:
threading.Thread(target=run_abnormal_report_sync, daemon=True).start()
return jsonify({
"success": True,
"message": "发料异常报表抓取任务已在后台启动!请观察黑框控制台的运行日志。",
"logs": "任务已在后台运行..."
})
except ImportError:
return jsonify({"success": False, "message": "找不到异常报表抓取脚本或导入失败"}), 404
except Exception as e:
return jsonify({"success": False, "message": f"系统错误: {str(e)}"}), 500
@app.route('/api/sync_bom', methods=['POST'])
def sync_bom():
"""触发后台运行 BOM 成本全量抓取脚本(修复跨机器路径问题)"""
global is_browser_busy
import sys
if is_browser_busy:
return jsonify({
"success": False,
"message": f"系统忙碌:当前正在执行 '{current_task_name}',请稍后再试。"
}), 409
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
def run_bom_sync():
set_browser_busy("手动 BOM 全量树抓取")
try:
from fetch_bom_cost_full_tree import fetch_bom_cost_tree
fetch_bom_cost_tree()
except Exception as e:
import traceback
err_msg = f"BOM 抓取失败: {e}\n{traceback.format_exc()}"
print(err_msg)
with open("app_error.log", "w") as f:
f.write(err_msg)
finally:
release_browser()
try:
# 将长时间运行的抓取任务放入后台线程,防止前端请求超时
threading.Thread(target=run_bom_sync, daemon=True).start()
return jsonify({
"success": True,
"message": "BOM 树抓取任务已在后台启动!预计耗时 10-20 分钟,请观察黑框控制台的运行日志。",
"logs": "任务已在后台运行..."
})
except ImportError:
return jsonify({"success": False, "message": "找不到 BOM 抓取脚本或导入失败"}), 404
except Exception as e:
return jsonify({"success": False, "message": f"系统错误: {str(e)}"}), 500
@app.route('/bom')
def bom_page():
"""渲染 BOM 成本树状图页面"""
return render_template('bom_tree.html')
@app.route('/api/bom_parents')
def get_bom_parents():
"""获取所有父件列表,供左侧菜单选择"""
keyword = request.args.get('keyword', '').strip()
conn = get_db_connection()
if keyword:
parents = conn.execute(
'SELECT parent_material_code, parent_material_name FROM bom_parent WHERE parent_material_name LIKE ? OR parent_material_code LIKE ? LIMIT 100',
(f'%{keyword}%', f'%{keyword}%')
).fetchall()
else:
parents = conn.execute('SELECT parent_material_code, parent_material_name FROM bom_parent LIMIT 100').fetchall()
conn.close()
return jsonify([dict(ix) for ix in parents])
def calculate_bom_cost(node, prices_dict):
"""
递归计算 BOM 树的成本。
如果该节点是叶子节点(底层采购件),直接从 prices_dict 取价格。
如果该节点是中间组件/父件,则成本为其所有子节点成本的总和。
注意:这里假设每个物料数量为 1。如果你的 ERP 数据里有 BOM 用量(quantity)字段,
应该用 `子件单价 * 用量` 进行累加。由于目前只抓了基本结构,先做单纯的层级累加。
"""
node_cost = 0.0
# 获取该物料的最新采购单价(默认为 0
price_record = prices_dict.get(node['materialCode'])
own_price = price_record['receive_price'] if price_record and isinstance(price_record['receive_price'], (int, float)) else 0.0
if not node.get('children'):
# 叶子节点,直接使用自己的采购价
node_cost = own_price
else:
# 有子件的组件,成本等于所有子件成本的累加
for child in node['children']:
node_cost += calculate_bom_cost(child, prices_dict)
# 兜底机制:如果在 ERP 里这是一个组件,但没查到子件成本(比如没抓全),
# 则尝试看看它自己有没有直接的采购记录
if node_cost == 0.0 and own_price > 0:
node_cost = own_price
# 把计算出来的汇总成本和自身单价都挂载到节点上,供前端使用
node['totalCost'] = round(node_cost, 2)
node['ownPrice'] = round(own_price, 2)
# 如果有采购记录,把时间和单号也带上
if price_record:
node['receiptTime'] = price_record['receipt_time']
node['poCode'] = price_record['purchase_order_code']
node['supplierName'] = price_record['supplier_name']
else:
node['receiptTime'] = '-'
node['poCode'] = '-'
node['supplierName'] = '-'
return node_cost
@app.route('/api/bom_tree/<parent_code>')
def get_bom_tree(parent_code):
"""根据父件代码,组装其下所有的子件,并计算整个 BOM 树的累加成本"""
conn = get_db_connection()
# 1. 查出该父件的基本信息作为根节点
parent_info = conn.execute(
'SELECT parent_material_code, parent_material_name FROM bom_parent WHERE parent_material_code = ?',
(parent_code,)
).fetchone()
if not parent_info:
conn.close()
return jsonify({"error": "找不到该父件"}), 404
# 2. 查出该家族的所有子件
children_records = conn.execute(
'SELECT id, node_material_code, node_material_name, parent_node_id, bom_level, usage_qty FROM bom_child WHERE parent_material_code = ?',
(parent_code,)
).fetchall()
# 3. 提前批量查出这棵树里所有物料的【最新采购价】
# 避免前端每次点击都去查一次,也为了在后端能直接算好总成本
material_codes = [row['node_material_code'] for row in children_records]
material_codes.append(parent_info['parent_material_code'])
prices_dict = {}
if material_codes:
# 使用 IN 语句批量查询,并按物料分组取最新时间的价格
placeholders = ','.join(['?'] * len(material_codes))
# SQLite 分组取最新记录的安全写法(使用窗口函数 ROW_NUMBER
price_records = conn.execute(f'''
WITH RankedRecords AS (
SELECT material_code,
ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price,
receipt_time, purchase_order_code, supplier_name,
ROW_NUMBER() OVER(PARTITION BY material_code ORDER BY receipt_time DESC, id DESC) as rn
FROM receipt_details
WHERE material_code IN ({placeholders})
)
SELECT material_code, receive_price, receipt_time, purchase_order_code, supplier_name
FROM RankedRecords
WHERE rn = 1
''', material_codes).fetchall()
for r in price_records:
prices_dict[r['material_code']] = dict(r)
conn.close()
# 为不同层级预设好看的主题色(从父件 0 到 第 5 层)
level_colors = {
0: '#F56C6C', # 红色:顶级父件成品
1: '#E6A23C', # 橙色:一级子件(大组件)
2: '#67C23A', # 绿色:二级子件
3: '#409EFF', # 蓝色:三级子件
4: '#909399', # 灰色:四级子件
5: '#DDA0DD' # 紫色:五级或更深子件
}
# 4. 开始在 Python 内存中“找爸爸”组装树
nodes_map = {}
for row in children_records:
node_id = row['id']
level = row['bom_level']
nodes_map[node_id] = {
"name": f"[{row['node_material_code']}]\n{row['node_material_name']}",
"materialCode": row['node_material_code'],
"materialName": row['node_material_name'],
"bomLevel": level,
"parentNodeId": row['parent_node_id'],
"usageQty": row['usage_qty'],
# 基础颜色配置,稍后根据是否有子节点调整实心/空心
"itemStyle": {
"color": level_colors.get(level, '#DDA0DD'),
"borderColor": level_colors.get(level, '#DDA0DD')
},
"children": []
}
root_tree = {
"name": f"[{parent_info['parent_material_code']}]\n{parent_info['parent_material_name']}",
"materialCode": parent_info['parent_material_code'],
"materialName": parent_info['parent_material_name'],
"bomLevel": 0,
"usageQty": 1.0,
"itemStyle": {
"color": level_colors[0],
"borderColor": level_colors[0]
},
"children": []
}
for node_id, node_data in nodes_map.items():
parent_id = node_data['parentNodeId']
if parent_id is None:
root_tree['children'].append(node_data)
else:
if parent_id in nodes_map:
nodes_map[parent_id]['children'].append(node_data)
# 5. 递归处理:计算树的成本,并且根据是否有子件修改圆圈的实心/空心样式
def process_tree_nodes(node, prices_dict):
node_cost = 0.0
price_record = prices_dict.get(node['materialCode'])
own_price = price_record['receive_price'] if price_record and isinstance(price_record['receive_price'], (int, float)) else 0.0
has_children = bool(node.get('children'))
code = node.get('materialCode', '')
# 【核心视觉区分:还能不能点开?】
# 如果它有子件(还能点开展开),就让它变成实心的(填充颜色与边框一致)
# 如果它是最底层的零件(没有子件了,点不开了),就让它变成空心的(填充白色,只保留彩色边框)
if has_children:
node['itemStyle']['color'] = node['itemStyle']['borderColor']
# 给可以点击的节点加个发光效果
node['itemStyle']['shadowBlur'] = 10
node['itemStyle']['shadowColor'] = 'rgba(0, 0, 0, 0.2)'
else:
node['itemStyle']['color'] = '#FFFFFF' # 白色填充(空心)
node['itemStyle']['borderWidth'] = 2 # 加粗彩色边框
# 乘以该节点的耗用量,得到该节点的总成本贡献
usage_qty = float(node.get('usageQty', 1.0))
if not has_children:
node_cost = own_price
node_cost = node_cost * usage_qty
else:
for child in node['children']:
node_cost += process_tree_nodes(child, prices_dict)
if node_cost == 0.0 and own_price > 0:
node_cost = own_price
# 【工序件特殊处理逻辑】
# 如果当前节点是 2 或 3 开头的加工工序件,由于物理上是同一个东西的流转,
# BOM 表中的用量只是为了匹配底层的原材料用量,不能再重复相乘。
# 所以强行忽略 2 或 3 开头工序件的自身 BOM 用量,直接 1:1 继承其子件汇总的成本。
if code.startswith('2') or code.startswith('3'):
# 保持 node_cost 不变,相当于乘以 1
pass
else:
# 正常的装配组件,需要将子件汇总成本乘以自身用量
node_cost = node_cost * usage_qty
node['totalCost'] = round(node_cost, 2)
node['ownPrice'] = round(own_price, 2)
if price_record:
node['receiptTime'] = price_record['receipt_time']
node['poCode'] = price_record['purchase_order_code']
node['supplierName'] = price_record['supplier_name']
else:
node['receiptTime'] = '-'
node['poCode'] = '-'
node['supplierName'] = '-'
return node_cost
process_tree_nodes(root_tree, prices_dict)
return jsonify(root_tree)
@app.route('/api/latest_price/<material_code>')
def get_latest_price(material_code):
"""根据物料代码,去收货明细表中查询最新一次的收货价格和时间"""
conn = get_db_connection()
# 按时间倒序,取第一条(最新一条)
latest_record = conn.execute('''
SELECT ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price,
receipt_time, purchase_order_code, supplier_name
FROM receipt_details
WHERE material_code = ?
ORDER BY receipt_time DESC
LIMIT 1
''', (material_code,)).fetchone()
conn.close()
if latest_record:
return jsonify(dict(latest_record))
else:
return jsonify({"receive_price": "无采购记录", "receipt_time": "-", "purchase_order_code": "-", "supplier_name": "-"})
@app.route('/compare')
def compare_page():
"""渲染 BOM 成本期间对比分析页面"""
return render_template('bom_compare.html')
def build_bom_compare_tree(parent_code, start_a, end_a, start_b, end_b):
"""构建用于成本对比分析的 BOM 树数据"""
conn = get_db_connection()
# 1. 查出基本结构
parent_info = conn.execute(
'SELECT parent_material_code, parent_material_name FROM bom_parent WHERE parent_material_code = ?',
(parent_code,)
).fetchone()
if not parent_info:
conn.close()
return None, "找不到该父件"
children_records = conn.execute(
'SELECT id, node_material_code, node_material_name, parent_node_id, bom_level, usage_qty FROM bom_child WHERE parent_material_code = ?',
(parent_code,)
).fetchall()
material_codes = [row['node_material_code'] for row in children_records]
material_codes.append(parent_info['parent_material_code'])
# 获取唯一的物料代码列表以进行查询
unique_codes = list(set(material_codes))
prices_dict = {code: {
'latest_price': 0.0,
'unit_name': '',
'casting_weight': None,
'period_a_price': 0.0, 'period_a_status': 'normal', # normal, fallback(黄色), missing(红色)
'period_b_price': 0.0, 'period_b_status': 'normal'
} for code in unique_codes}
if unique_codes:
placeholders = ','.join(['?'] * len(unique_codes))
# --- A. 查询历史最新价格及铸件重量/单位 ---
latest_records = conn.execute(f'''
WITH RankedRecords AS (
SELECT material_code,
unit_name,
purchase_qty,
receive_qty,
ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price,
ROW_NUMBER() OVER(PARTITION BY material_code ORDER BY receipt_time DESC, id DESC) as rn
FROM receipt_details
WHERE material_code IN ({placeholders})
)
SELECT material_code, unit_name, purchase_qty, receive_qty, receive_price
FROM RankedRecords
WHERE rn = 1
''', unique_codes).fetchall()
for r in latest_records:
code = r['material_code']
prices_dict[code]['latest_price'] = float(r['receive_price'] or 0)
prices_dict[code]['unit_name'] = r['unit_name'] or ''
# 计算铸件重量逻辑
if code.startswith('1PZJ') and r['purchase_qty'] and float(r['purchase_qty']) > 0:
casting_weight = float(r['receive_qty'] or 0) / float(r['purchase_qty'])
prices_dict[code]['casting_weight'] = round(casting_weight, 4) # 保留4位小数更精确
# 辅助函数:处理某个期间的价格逻辑
def process_period(start_date, end_date, period_key, status_key):
if not start_date or not end_date:
return
# 给结束日期加上 23:59:59确保包含最后一天全天的数据
end_date_full = f"{end_date} 23:59:59"
# 1. 先查询该期间内的最新收货价
params = unique_codes + [start_date, end_date_full]
period_latest_records = conn.execute(f'''
WITH RankedRecords AS (
SELECT material_code,
ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price,
ROW_NUMBER() OVER(PARTITION BY material_code ORDER BY receipt_time DESC, id DESC) as rn
FROM receipt_details
WHERE material_code IN ({placeholders})
AND receipt_time >= ? AND receipt_time <= ?
)
SELECT material_code, receive_price
FROM RankedRecords
WHERE rn = 1
''', params).fetchall()
found_codes = set()
for r in period_latest_records:
code = r['material_code']
prices_dict[code][period_key] = float(r['receive_price'] or 0)
prices_dict[code][status_key] = 'normal'
found_codes.add(code)
# 2. 对于在期间内没有采购记录的物料,触发降级策略(找期间之前的最新价)
missing_codes = [c for c in unique_codes if c not in found_codes]
if missing_codes:
missing_ph = ','.join(['?'] * len(missing_codes))
fallback_params = missing_codes + [start_date]
fallback_records = conn.execute(f'''
WITH RankedRecords AS (
SELECT material_code,
ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price,
ROW_NUMBER() OVER(PARTITION BY material_code ORDER BY receipt_time DESC, id DESC) as rn
FROM receipt_details
WHERE material_code IN ({missing_ph})
AND receipt_time < ?
)
SELECT material_code, receive_price
FROM RankedRecords
WHERE rn = 1
''', fallback_params).fetchall()
fallback_found = set()
for r in fallback_records:
code = r['material_code']
prices_dict[code][period_key] = float(r['receive_price'] or 0)
prices_dict[code][status_key] = 'fallback' # 标记为使用了历史最近价
fallback_found.add(code)
for code in missing_codes:
if code not in fallback_found:
prices_dict[code][period_key] = 0.0
if prices_dict[code].get('latest_price', 0) > 0:
prices_dict[code][status_key] = 'fallback'
else:
prices_dict[code][status_key] = 'missing'
# --- B. 处理期间 A ---
process_period(start_a, end_a, 'period_a_price', 'period_a_status')
# --- C. 处理期间 B ---
process_period(start_b, end_b, 'period_b_price', 'period_b_status')
conn.close()
# 4. 组装树结构
nodes_map = {}
for row in children_records:
node_id = row['id']
nodes_map[node_id] = {
"id": node_id,
"materialCode": row['node_material_code'],
"materialName": row['node_material_name'],
"bomLevel": row['bom_level'],
"usageQty": row['usage_qty'],
"parentNodeId": row['parent_node_id'],
"unitName": "",
"castingWeight": None,
"children": []
}
root_tree = {
"id": "root",
"materialCode": parent_info['parent_material_code'],
"materialName": parent_info['parent_material_name'],
"bomLevel": "0",
"usageQty": 1.0,
"unitName": "",
"castingWeight": None,
"children": []
}
for node_id, node_data in nodes_map.items():
parent_id = node_data['parentNodeId']
if parent_id is None:
root_tree['children'].append(node_data)
else:
if parent_id in nodes_map:
nodes_map[parent_id]['children'].append(node_data)
# 5. 递归计算累加成本
def calc_compare_costs(node):
code = node['materialCode']
usage_qty = float(node.get('usageQty', 1.0))
price_info = prices_dict.get(code, {})
# 获取基础单价
node['latestUnitPrice'] = price_info.get('latest_price', 0.0)
node['periodAUnitPrice'] = price_info.get('period_a_price', 0.0)
node['periodBUnitPrice'] = price_info.get('period_b_price', 0.0)
# 获取单位名称和铸件单件重量
node['unitName'] = price_info.get('unit_name', '')
node['castingWeight'] = price_info.get('casting_weight')
node['periodAStatus'] = price_info.get('period_a_status', 'missing')
node['periodBStatus'] = price_info.get('period_b_status', 'missing')
has_children = bool(node.get('children'))
if not has_children:
if code and code.startswith('1PZJ') and node.get('castingWeight'):
multiplier = usage_qty * float(node['castingWeight'])
node['calcType'] = 'casting'
else:
multiplier = usage_qty
node['calcType'] = 'normal'
node['totalLatest'] = node['latestUnitPrice'] * multiplier
node['totalPeriodA'] = node['periodAUnitPrice'] * multiplier
node['totalPeriodB'] = node['periodBUnitPrice'] * multiplier
node['showAStatus'] = node['periodAStatus']
node['showBStatus'] = node['periodBStatus']
else:
total_latest = 0.0
total_a = 0.0
total_b = 0.0
for child in node['children']:
calc_compare_costs(child)
total_latest += child['totalLatest']
total_a += child['totalPeriodA']
total_b += child['totalPeriodB']
# 【工序件特殊处理逻辑】
# 同样在成本对比页面2或3开头的工序件不能重复乘以用量直接继承子件成本
if code.startswith('2') or code.startswith('3'):
node['totalLatest'] = total_latest if total_latest > 0 else node['latestUnitPrice']
node['totalPeriodA'] = total_a if total_a > 0 else node['periodAUnitPrice']
node['totalPeriodB'] = total_b if total_b > 0 else node['periodBUnitPrice']
else:
node['totalLatest'] = (total_latest if total_latest > 0 else node['latestUnitPrice']) * usage_qty
node['totalPeriodA'] = (total_a if total_a > 0 else node['periodAUnitPrice']) * usage_qty
node['totalPeriodB'] = (total_b if total_b > 0 else node['periodBUnitPrice']) * usage_qty
node['showAStatus'] = 'normal'
node['showBStatus'] = 'normal'
calc_compare_costs(root_tree)
return root_tree, None
@app.route('/api/bom_tree_compare/<parent_code>')
def get_bom_tree_compare(parent_code):
"""
为成本对比页面提供带时间段价格分析的 BOM 树数据。
"""
start_a = request.args.get('start_a')
end_a = request.args.get('end_a')
start_b = request.args.get('start_b')
end_b = request.args.get('end_b')
root_tree, error = build_bom_compare_tree(parent_code, start_a, end_a, start_b, end_b)
if error:
return jsonify({"error": error}), 404
# 为了配合 ElementUI 的树形表格,根节点必须包装在数组里
return jsonify([root_tree])
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from flask import send_file
@app.route('/api/export_compare/<parent_code>')
def export_compare(parent_code):
start_a = request.args.get('start_a')
end_a = request.args.get('end_a')
start_b = request.args.get('start_b')
end_b = request.args.get('end_b')
root_tree, error = build_bom_compare_tree(parent_code, start_a, end_a, start_b, end_b)
if error:
return jsonify({"error": error}), 404
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "BOM成本对比"
# 定义样式
header_fill = PatternFill(start_color="409EFF", end_color="409EFF", fill_type="solid")
header_font = Font(color="FFFFFF", bold=True)
center_align = Alignment(horizontal="center", vertical="center")
border = Border(left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin'))
headers = [
"物料代码", "物料名称", "BOM层级", "用量", "单位",
"最新单价", "最新总成本",
f"期间A单价 ({start_a or ''}{end_a or ''})", f"期间A总成本",
f"期间B单价 ({start_b or ''}{end_b or ''})", f"期间B总成本",
"单价差异 (B-A)", "总成本差异 (B-A)"
]
ws.append(headers)
for col_idx, cell in enumerate(ws[1], 1):
cell.fill = header_fill
cell.font = header_font
cell.alignment = center_align
cell.border = border
ws.column_dimensions[get_column_letter(col_idx)].width = 15
ws.column_dimensions['A'].width = 18
ws.column_dimensions['B'].width = 30
ws.column_dimensions['H'].width = 25
ws.column_dimensions['J'].width = 25
def write_node_to_excel(node, level=0):
# 差异计算逻辑(同前端)
period_a_unit = node.get('periodAUnitPrice', 0) or 0
period_b_unit = node.get('periodBUnitPrice', 0) or 0
diff_unit = period_b_unit - period_a_unit
period_a_total = node.get('totalPeriodA', 0) or 0
period_b_total = node.get('totalPeriodB', 0) or 0
diff_total = period_b_total - period_a_total
prefix = " " * level
row_data = [
node.get('materialCode', ''),
prefix + node.get('materialName', ''),
node.get('bomLevel', ''),
node.get('usageQty', 1),
node.get('unitName', ''),
node.get('latestUnitPrice', 0),
node.get('totalLatest', 0),
period_a_unit,
period_a_total,
period_b_unit,
period_b_total,
diff_unit,
diff_total
]
ws.append(row_data)
current_row = ws.max_row
# 简单格式化
for col_idx, cell in enumerate(ws[current_row], 1):
cell.border = border
if col_idx in [6, 7, 8, 9, 10, 11, 12, 13]:
cell.number_format = '0.00'
# 递归子节点
if node.get('children'):
for child in node['children']:
write_node_to_excel(child, level + 1)
write_node_to_excel(root_tree)
output = io.BytesIO()
wb.save(output)
output.seek(0)
filename = f"BOM成本对比_{parent_code}.xlsx"
return send_file(
output,
as_attachment=True,
download_name=filename,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
def open_browser(port):
"""延迟 1.5 秒后自动打开系统默认浏览器"""
time.sleep(1.5)
url = f"http://127.0.0.1:{port}"
print(f"🚀 正在自动打开浏览器: {url}")
webbrowser.open(url)
def find_free_port(start_port=5050, max_port=5100):
"""自动寻找未被占用的端口,避免 Windows 10013 端口被拒错误"""
import socket
for port in range(start_port, max_port):
# 尝试通过创建 socket 并监听来确认端口是否真的可用
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 开启端口复用,避免 TIME_WAIT 状态导致误判
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
# 绑定到 127.0.0.1 而不是 0.0.0.0
s.bind(('127.0.0.1', port))
return port
except OSError:
continue
# 如果 5050-5100 都被占用或被拦截,尝试让系统随机分配一个端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 0))
return s.getsockname()[1]
if __name__ == '__main__':
# 这里是专门给开发环境python app.py使用的
# 生产环境 Gunicorn 不会执行这部分代码,而是直接导入 app 变量
# 检测是否是通过 PyInstaller 打包运行
is_frozen = getattr(sys, 'frozen', False)
if os.environ.get('WERKZEUG_RUN_MAIN') != 'true':
try:
port = find_free_port()
except Exception as e:
print(f"⚠️ 无法自动获取端口,回退到默认端口: {e}")
port = 5050
# 启动前开启一个线程去拉起浏览器
threading.Thread(target=open_browser, args=(port,), daemon=True).start()
print("🚀 前端展示后端服务已启动!")
print(f"👉 本机访问地址: http://127.0.0.1:{port}")
else:
port = 5050
app.run(
debug=not is_frozen,
host='0.0.0.0',
port=port,
threaded=True,
use_reloader=False
)