Files
datie-bom/web_ui/app.py
2026-04-27 15:24:41 +08:00

684 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, render_template, jsonify, request
import sqlite3
import subprocess
from pathlib import Path
import sys
import os
import io
import contextlib
import logging
import threading
import time
import webbrowser
from apscheduler.schedulers.background import BackgroundScheduler
def get_base_dir():
"""获取程序运行时的基础目录(静态资源、脚本代码等只读文件存放处)"""
if getattr(sys, 'frozen', False):
return Path(sys._MEIPASS)
return Path(__file__).parent.parent
def get_data_dir():
"""获取持久化数据存放目录(数据库、输出文件等,保证重启不丢失)"""
if getattr(sys, 'frozen', False):
return Path(os.path.dirname(sys.executable))
return Path(__file__).parent.parent
BASE_DIR = get_base_dir()
DATA_DIR = get_data_dir()
# Web 资源路径 (在 PyInstaller _MEIPASS 目录中)
TEMPLATE_DIR = BASE_DIR / "web_ui" / "templates"
STATIC_DIR = BASE_DIR / "web_ui" / "static"
app = Flask(__name__, template_folder=str(TEMPLATE_DIR), static_folder=str(STATIC_DIR))
# 数据库路径:指向外部持久化数据目录,确保数据不丢失
DB_PATH = DATA_DIR / "browser_login" / "output" / "erp_data.db"
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
# 抓取脚本目录 (属于代码文件,从 BASE_DIR 加载)
BROWSER_LOGIN_DIR = BASE_DIR / "browser_login"
def background_sync_job():
"""APScheduler 后台定时任务执行增量抓取"""
print("[定时任务] 正在执行后台增量数据同步...")
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
try:
from fetch_receipt_details_incremental import fetch_receipt_details_incremental
# 重定向输出,避免污染终端,同时捕获可能的信息
f = io.StringIO()
with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f):
fetch_receipt_details_incremental()
print(f"[定时任务] 同步完成。")
except Exception as e:
print(f"[定时任务] 同步失败: {str(e)}")
# 初始化定时调度器
scheduler = BackgroundScheduler()
scheduler.add_job(func=background_sync_job, trigger="interval", minutes=30)
# 启动调度器
scheduler.start()
# 确保在应用退出时关闭调度器
import atexit
atexit.register(lambda: scheduler.shutdown())
def get_db_connection():
"""获取数据库连接"""
conn = sqlite3.connect(DB_PATH)
# 将查询结果转换为字典形式
conn.row_factory = sqlite3.Row
return conn
@app.route('/')
def index():
"""渲染导航主页"""
return render_template('home.html')
@app.route('/receipts')
def receipts_page():
"""渲染收货明细数据看板"""
return render_template('index.html')
@app.route('/api/receipts')
def get_receipts():
"""获取收货明细数据(支持分页和多条件搜索)"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
offset = (page - 1) * limit
# 获取搜索参数
supplier_name = request.args.get('supplier_name', '').strip()
material_name = request.args.get('material_name', '').strip()
po_code = request.args.get('po_code', '').strip()
conn = get_db_connection()
# 构建动态 SQL 查询
query_conditions = []
params = []
if supplier_name:
query_conditions.append("supplier_name LIKE ?")
params.append(f"%{supplier_name}%")
if material_name:
query_conditions.append("material_name LIKE ?")
params.append(f"%{material_name}%")
if po_code:
query_conditions.append("purchase_order_code LIKE ?")
params.append(f"%{po_code}%")
where_clause = ""
if query_conditions:
where_clause = " WHERE " + " AND ".join(query_conditions)
# 获取总数
count_query = f"SELECT COUNT(*) FROM receipt_details{where_clause}"
total = conn.execute(count_query, params).fetchone()[0]
# 获取分页数据
data_query = f"SELECT * FROM receipt_details{where_clause} ORDER BY receipt_time DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
receipts = conn.execute(data_query, params).fetchall()
conn.close()
return jsonify({
"total": total,
"rows": [dict(ix) for ix in receipts]
})
@app.route('/api/sync_receipts', methods=['POST'])
def sync_receipts():
"""触发后台运行收货明细增量抓取脚本(修复跨机器路径问题)"""
import sys
import io
import contextlib
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
try:
from fetch_receipt_details_incremental import fetch_receipt_details_incremental
# 捕获函数内部的 print/log 输出
f = io.StringIO()
with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f):
fetch_receipt_details_incremental()
logs = f.getvalue()
return jsonify({
"success": True,
"message": "增量同步执行完成",
"logs": logs
})
except ImportError:
return jsonify({"success": False, "message": "找不到增量抓取脚本或导入失败"}), 404
except Exception as e:
return jsonify({"success": False, "message": f"系统错误: {str(e)}"}), 500
@app.route('/api/sync_bom', methods=['POST'])
def sync_bom():
"""触发后台运行 BOM 成本全量抓取脚本(修复跨机器路径问题)"""
import sys
import io
import contextlib
if str(BROWSER_LOGIN_DIR) not in sys.path:
sys.path.insert(0, str(BROWSER_LOGIN_DIR))
try:
from fetch_bom_cost_full_tree import fetch_bom_cost_tree
# 捕获函数内部的 print/log 输出
f = io.StringIO()
with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f):
fetch_bom_cost_tree()
logs = f.getvalue()
return jsonify({
"success": True,
"message": "BOM 同步执行完成",
"logs": logs
})
except ImportError:
return jsonify({"success": False, "message": "找不到 BOM 抓取脚本或导入失败"}), 404
except Exception as e:
return jsonify({"success": False, "message": f"系统错误: {str(e)}"}), 500
@app.route('/bom')
def bom_page():
"""渲染 BOM 成本树状图页面"""
return render_template('bom_tree.html')
@app.route('/api/bom_parents')
def get_bom_parents():
"""获取所有父件列表,供左侧菜单选择"""
keyword = request.args.get('keyword', '').strip()
conn = get_db_connection()
if keyword:
parents = conn.execute(
'SELECT parent_material_code, parent_material_name FROM bom_parent WHERE parent_material_name LIKE ? OR parent_material_code LIKE ? LIMIT 100',
(f'%{keyword}%', f'%{keyword}%')
).fetchall()
else:
parents = conn.execute('SELECT parent_material_code, parent_material_name FROM bom_parent LIMIT 100').fetchall()
conn.close()
return jsonify([dict(ix) for ix in parents])
def calculate_bom_cost(node, prices_dict):
"""
递归计算 BOM 树的成本。
如果该节点是叶子节点(底层采购件),直接从 prices_dict 取价格。
如果该节点是中间组件/父件,则成本为其所有子节点成本的总和。
注意:这里假设每个物料数量为 1。如果你的 ERP 数据里有 BOM 用量(quantity)字段,
应该用 `子件单价 * 用量` 进行累加。由于目前只抓了基本结构,先做单纯的层级累加。
"""
node_cost = 0.0
# 获取该物料的最新采购单价(默认为 0
price_record = prices_dict.get(node['materialCode'])
own_price = price_record['receive_price'] if price_record and isinstance(price_record['receive_price'], (int, float)) else 0.0
if not node.get('children'):
# 叶子节点,直接使用自己的采购价
node_cost = own_price
else:
# 有子件的组件,成本等于所有子件成本的累加
for child in node['children']:
node_cost += calculate_bom_cost(child, prices_dict)
# 兜底机制:如果在 ERP 里这是一个组件,但没查到子件成本(比如没抓全),
# 则尝试看看它自己有没有直接的采购记录
if node_cost == 0.0 and own_price > 0:
node_cost = own_price
# 把计算出来的汇总成本和自身单价都挂载到节点上,供前端使用
node['totalCost'] = round(node_cost, 2)
node['ownPrice'] = round(own_price, 2)
# 如果有采购记录,把时间和单号也带上
if price_record:
node['receiptTime'] = price_record['receipt_time']
node['poCode'] = price_record['purchase_order_code']
node['supplierName'] = price_record['supplier_name']
else:
node['receiptTime'] = '-'
node['poCode'] = '-'
node['supplierName'] = '-'
return node_cost
@app.route('/api/bom_tree/<parent_code>')
def get_bom_tree(parent_code):
"""根据父件代码,组装其下所有的子件,并计算整个 BOM 树的累加成本"""
conn = get_db_connection()
# 1. 查出该父件的基本信息作为根节点
parent_info = conn.execute(
'SELECT parent_material_code, parent_material_name FROM bom_parent WHERE parent_material_code = ?',
(parent_code,)
).fetchone()
if not parent_info:
conn.close()
return jsonify({"error": "找不到该父件"}), 404
# 2. 查出该家族的所有子件
children_records = conn.execute(
'SELECT id, node_material_code, node_material_name, parent_node_id, bom_level, usage_qty FROM bom_child WHERE parent_material_code = ?',
(parent_code,)
).fetchall()
# 3. 提前批量查出这棵树里所有物料的【最新采购价】
# 避免前端每次点击都去查一次,也为了在后端能直接算好总成本
material_codes = [row['node_material_code'] for row in children_records]
material_codes.append(parent_info['parent_material_code'])
prices_dict = {}
if material_codes:
# 使用 IN 语句批量查询,并按物料分组取最新时间的价格
placeholders = ','.join(['?'] * len(material_codes))
# SQLite 分组取最新记录的经典写法
price_records = conn.execute(f'''
SELECT material_code,
ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price,
receipt_time, purchase_order_code, supplier_name
FROM receipt_details
WHERE material_code IN ({placeholders})
GROUP BY material_code
HAVING receipt_time = MAX(receipt_time)
''', material_codes).fetchall()
for r in price_records:
prices_dict[r['material_code']] = dict(r)
conn.close()
# 为不同层级预设好看的主题色(从父件 0 到 第 5 层)
level_colors = {
0: '#F56C6C', # 红色:顶级父件成品
1: '#E6A23C', # 橙色:一级子件(大组件)
2: '#67C23A', # 绿色:二级子件
3: '#409EFF', # 蓝色:三级子件
4: '#909399', # 灰色:四级子件
5: '#DDA0DD' # 紫色:五级或更深子件
}
# 4. 开始在 Python 内存中“找爸爸”组装树
nodes_map = {}
for row in children_records:
node_id = row['id']
level = row['bom_level']
nodes_map[node_id] = {
"name": f"[{row['node_material_code']}]\n{row['node_material_name']}",
"materialCode": row['node_material_code'],
"materialName": row['node_material_name'],
"bomLevel": level,
"parentNodeId": row['parent_node_id'],
"usageQty": row['usage_qty'],
# 基础颜色配置,稍后根据是否有子节点调整实心/空心
"itemStyle": {
"color": level_colors.get(level, '#DDA0DD'),
"borderColor": level_colors.get(level, '#DDA0DD')
},
"children": []
}
root_tree = {
"name": f"[{parent_info['parent_material_code']}]\n{parent_info['parent_material_name']}",
"materialCode": parent_info['parent_material_code'],
"materialName": parent_info['parent_material_name'],
"bomLevel": 0,
"usageQty": 1.0,
"itemStyle": {
"color": level_colors[0],
"borderColor": level_colors[0]
},
"children": []
}
for node_id, node_data in nodes_map.items():
parent_id = node_data['parentNodeId']
if parent_id is None:
root_tree['children'].append(node_data)
else:
if parent_id in nodes_map:
nodes_map[parent_id]['children'].append(node_data)
# 5. 递归处理:计算树的成本,并且根据是否有子件修改圆圈的实心/空心样式
def process_tree_nodes(node, prices_dict):
node_cost = 0.0
price_record = prices_dict.get(node['materialCode'])
own_price = price_record['receive_price'] if price_record and isinstance(price_record['receive_price'], (int, float)) else 0.0
has_children = bool(node.get('children'))
# 【核心视觉区分:还能不能点开?】
# 如果它有子件(还能点开展开),就让它变成实心的(填充颜色与边框一致)
# 如果它是最底层的零件(没有子件了,点不开了),就让它变成空心的(填充白色,只保留彩色边框)
if has_children:
node['itemStyle']['color'] = node['itemStyle']['borderColor']
# 给可以点击的节点加个发光效果
node['itemStyle']['shadowBlur'] = 10
node['itemStyle']['shadowColor'] = 'rgba(0, 0, 0, 0.2)'
else:
node['itemStyle']['color'] = '#FFFFFF' # 白色填充(空心)
node['itemStyle']['borderWidth'] = 2 # 加粗彩色边框
if not has_children:
node_cost = own_price
else:
for child in node['children']:
node_cost += process_tree_nodes(child, prices_dict)
if node_cost == 0.0 and own_price > 0:
node_cost = own_price
# 乘以该节点的耗用量,得到该节点的总成本贡献
usage_qty = float(node.get('usageQty', 1.0))
node_cost = node_cost * usage_qty
node['totalCost'] = round(node_cost, 2)
node['ownPrice'] = round(own_price, 2)
if price_record:
node['receiptTime'] = price_record['receipt_time']
node['poCode'] = price_record['purchase_order_code']
node['supplierName'] = price_record['supplier_name']
else:
node['receiptTime'] = '-'
node['poCode'] = '-'
node['supplierName'] = '-'
return node_cost
process_tree_nodes(root_tree, prices_dict)
return jsonify(root_tree)
@app.route('/api/latest_price/<material_code>')
def get_latest_price(material_code):
"""根据物料代码,去收货明细表中查询最新一次的收货价格和时间"""
conn = get_db_connection()
# 按时间倒序,取第一条(最新一条)
latest_record = conn.execute('''
SELECT ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price,
receipt_time, purchase_order_code, supplier_name
FROM receipt_details
WHERE material_code = ?
ORDER BY receipt_time DESC
LIMIT 1
''', (material_code,)).fetchone()
conn.close()
if latest_record:
return jsonify(dict(latest_record))
else:
return jsonify({"receive_price": "无采购记录", "receipt_time": "-", "purchase_order_code": "-", "supplier_name": "-"})
@app.route('/compare')
def compare_page():
"""渲染 BOM 成本期间对比分析页面"""
return render_template('bom_compare.html')
@app.route('/api/bom_tree_compare/<parent_code>')
def get_bom_tree_compare(parent_code):
"""
为成本对比页面提供带时间段价格分析的 BOM 树数据。
前端需要传入两个时间段参数:
start_a, end_a (期间 A)
start_b, end_b (期间 B)
"""
start_a = request.args.get('start_a')
end_a = request.args.get('end_a')
start_b = request.args.get('start_b')
end_b = request.args.get('end_b')
conn = get_db_connection()
# 1. 查出基本结构
parent_info = conn.execute(
'SELECT parent_material_code, parent_material_name FROM bom_parent WHERE parent_material_code = ?',
(parent_code,)
).fetchone()
if not parent_info:
conn.close()
return jsonify({"error": "找不到该父件"}), 404
children_records = conn.execute(
'SELECT id, node_material_code, node_material_name, parent_node_id, bom_level, usage_qty FROM bom_child WHERE parent_material_code = ?',
(parent_code,)
).fetchall()
material_codes = [row['node_material_code'] for row in children_records]
material_codes.append(parent_info['parent_material_code'])
# 获取唯一的物料代码列表以进行查询
unique_codes = list(set(material_codes))
prices_dict = {code: {
'latest_price': 0.0,
'unit_name': '',
'casting_weight': None,
'period_a_price': 0.0, 'period_a_status': 'normal', # normal, fallback(黄色), missing(红色)
'period_b_price': 0.0, 'period_b_status': 'normal'
} for code in unique_codes}
if unique_codes:
placeholders = ','.join(['?'] * len(unique_codes))
# --- A. 查询历史最新价格及铸件重量/单位 ---
latest_records = conn.execute(f'''
SELECT material_code,
unit_name,
purchase_qty,
receive_qty,
ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price
FROM receipt_details
WHERE material_code IN ({placeholders})
GROUP BY material_code
HAVING receipt_time = MAX(receipt_time)
''', unique_codes).fetchall()
for r in latest_records:
code = r['material_code']
prices_dict[code]['latest_price'] = float(r['receive_price'] or 0)
prices_dict[code]['unit_name'] = r['unit_name'] or ''
# 计算铸件重量逻辑
if code.startswith('1PZJ') and r['purchase_qty'] and float(r['purchase_qty']) > 0:
casting_weight = float(r['receive_qty'] or 0) / float(r['purchase_qty'])
prices_dict[code]['casting_weight'] = round(casting_weight, 4) # 保留4位小数更精确
# 辅助函数:处理某个期间的价格逻辑
def process_period(start_date, end_date, period_key, status_key):
if not start_date or not end_date:
return
# 给结束日期加上 23:59:59确保包含最后一天全天的数据
end_date_full = f"{end_date} 23:59:59"
# 1. 先查询该期间内的最新收货价
params = unique_codes + [start_date, end_date_full]
period_latest_records = conn.execute(f'''
SELECT material_code,
ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price
FROM receipt_details
WHERE material_code IN ({placeholders})
AND receipt_time >= ? AND receipt_time <= ?
GROUP BY material_code
HAVING receipt_time = MAX(receipt_time)
''', params).fetchall()
found_codes = set()
for r in period_latest_records:
code = r['material_code']
prices_dict[code][period_key] = float(r['receive_price'] or 0)
prices_dict[code][status_key] = 'normal'
found_codes.add(code)
# 2. 对于在期间内没有采购记录的物料,触发降级策略(找期间之前的最新价)
missing_codes = [c for c in unique_codes if c not in found_codes]
if missing_codes:
missing_ph = ','.join(['?'] * len(missing_codes))
fallback_params = missing_codes + [start_date]
fallback_records = conn.execute(f'''
SELECT material_code,
ROUND(COALESCE(total_amount / NULLIF(receive_qty, 0), receive_price, 0), 2) AS receive_price
FROM receipt_details
WHERE material_code IN ({missing_ph})
AND receipt_time < ?
GROUP BY material_code
HAVING receipt_time = MAX(receipt_time)
''', fallback_params).fetchall()
fallback_found = set()
for r in fallback_records:
code = r['material_code']
prices_dict[code][period_key] = float(r['receive_price'] or 0)
prices_dict[code][status_key] = 'fallback' # 标记为使用了历史最近价
fallback_found.add(code)
# 3. 如果连期间之前的历史记录都没有,但它有基准价(说明是在期间之后才采购的),也算是 fallback 还是 missing
# 我们之前定义的是“无任何历史记录”才标红。所以如果它有最新基准价latest_price > 0
# 说明这东西买过,只是在 start_date 之前没买过。我们不应该把它标红missing而应该标为普通的没价格或者某种特殊颜色。
# 但为了严谨:既然我们在 start_date 之前没买过,意味着期初无库存/无价格,期间也没买,那这期间的价格只能是 0。
# 并且如果 latest_price > 0说明它并非“历史从未采购”我们把它标为 fallback 但价格是 0。
for code in missing_codes:
if code not in fallback_found:
prices_dict[code][period_key] = 0.0
if prices_dict[code].get('latest_price', 0) > 0:
# 有基准价,但在该期间和该期间之前都没买过,这不算“从未采购过”
prices_dict[code][status_key] = 'fallback'
else:
# 彻底查不到任何记录,才是真正的 missing
prices_dict[code][status_key] = 'missing'
# --- B. 处理期间 A ---
process_period(start_a, end_a, 'period_a_price', 'period_a_status')
# --- C. 处理期间 B ---
process_period(start_b, end_b, 'period_b_price', 'period_b_status')
conn.close()
# 4. 组装树结构
nodes_map = {}
for row in children_records:
node_id = row['id']
nodes_map[node_id] = {
"id": node_id,
"materialCode": row['node_material_code'],
"materialName": row['node_material_name'],
"bomLevel": row['bom_level'],
"usageQty": row['usage_qty'],
"parentNodeId": row['parent_node_id'],
"unitName": "",
"castingWeight": None,
"children": []
}
root_tree = {
"id": "root",
"materialCode": parent_info['parent_material_code'],
"materialName": parent_info['parent_material_name'],
"bomLevel": "0",
"usageQty": 1.0,
"unitName": "",
"castingWeight": None,
"children": []
}
for node_id, node_data in nodes_map.items():
parent_id = node_data['parentNodeId']
if parent_id is None:
root_tree['children'].append(node_data)
else:
if parent_id in nodes_map:
nodes_map[parent_id]['children'].append(node_data)
# 5. 递归计算累加成本
def calc_compare_costs(node):
code = node['materialCode']
usage_qty = float(node.get('usageQty', 1.0))
price_info = prices_dict.get(code, {})
# 获取基础单价
node['latestUnitPrice'] = price_info.get('latest_price', 0.0)
node['periodAUnitPrice'] = price_info.get('period_a_price', 0.0)
node['periodBUnitPrice'] = price_info.get('period_b_price', 0.0)
# 获取单位名称和铸件单件重量
node['unitName'] = price_info.get('unit_name', '')
node['castingWeight'] = price_info.get('casting_weight')
node['periodAStatus'] = price_info.get('period_a_status', 'missing')
node['periodBStatus'] = price_info.get('period_b_status', 'missing')
has_children = bool(node.get('children'))
# 判断一个节点在业务上是不是真的“组件”还是“采购件”
# 即使它没有子节点抓取时下面没数据但如果它自身没有采购价latestUnitPrice == 0
# 且在实际业务中它是挂在 BOM 上的(说明是个虚拟组件或大总成),我们也不该给它标颜色
is_real_purchased_leaf = (not has_children) and (node['latestUnitPrice'] > 0 or node['periodAStatus'] != 'missing' or node['periodBStatus'] != 'missing')
if not has_children:
# 叶子节点,总成本 = 单价 * 耗用量
node['totalLatest'] = node['latestUnitPrice'] * usage_qty
node['totalPeriodA'] = node['periodAUnitPrice'] * usage_qty
node['totalPeriodB'] = node['periodBUnitPrice'] * usage_qty
# 只要它是叶子节点,我们就给它标颜色。
# 如果它真的是虚拟件(基准价是 0且 A B 都是 missing那就大大方方给它标红点
# 用户刚才反馈说有的子件有基准价但是连点都没标,就是因为之前那个 is_real_purchased_leaf 过滤得太严格或者有 Bug。
node['showAStatus'] = node['periodAStatus']
node['showBStatus'] = node['periodBStatus']
else:
total_latest = 0.0
total_a = 0.0
total_b = 0.0
for child in node['children']:
calc_compare_costs(child)
total_latest += child['totalLatest']
total_a += child['totalPeriodA']
total_b += child['totalPeriodB']
# 父件总成本 = (子件成本之和) * 父件自身的耗用量
# 如果累加为0但自身有价格使用 (自身单价 * 自身耗用量) 作为兜底
node['totalLatest'] = (total_latest if total_latest > 0 else node['latestUnitPrice']) * usage_qty
node['totalPeriodA'] = (total_a if total_a > 0 else node['periodAUnitPrice']) * usage_qty
node['totalPeriodB'] = (total_b if total_b > 0 else node['periodBUnitPrice']) * usage_qty
# 父件的累加成本是由众多子件构成的,所以不显示单一的黄/红状态灯
node['showAStatus'] = 'normal'
node['showBStatus'] = 'normal'
calc_compare_costs(root_tree)
# 为了配合 ElementUI 的树形表格,根节点必须包装在数组里
return jsonify([root_tree])
def open_browser():
"""延迟 1.5 秒后自动打开系统默认浏览器"""
time.sleep(1.5)
url = "http://127.0.0.1:5050"
print(f"🚀 正在自动打开浏览器: {url}")
webbrowser.open(url)
if __name__ == '__main__':
# 启动前开启一个线程去拉起浏览器
threading.Thread(target=open_browser, daemon=True).start()
# 启动后端服务
print("🚀 前端展示后端服务已启动!请在浏览器访问: http://127.0.0.1:5050")
# 更改默认端口为 5050避开 macOS 控制中心的 5000 端口占用
app.run(debug=False, host='0.0.0.0', port=5050)