Files
datie-bom/browser_login/analysis_service.py
2026-06-22 14:34:23 +08:00

398 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
import re
from pathlib import Path
import sys
# 把当前目录加入 sys.path
sys.path.insert(0, str(Path(__file__).parent))
from config import OUTPUT_DIR
DB_PATH = OUTPUT_DIR / 'erp_data.db'
class MaterialReconciliationService:
def __init__(self):
self.db_path = DB_PATH
def get_conn(self):
return sqlite3.connect(self.db_path)
def get_data_period(self):
"""获取当前数据库中发料记录的时间周期,用于前端展示对账月份"""
import datetime
import calendar
now = datetime.datetime.now()
first_day = datetime.date(now.year, now.month, 1).strftime('%Y-%m-%d')
last_day = datetime.date(now.year, now.month, calendar.monthrange(now.year, now.month)[1]).strftime('%Y-%m-%d')
return {"start": first_day, "end": last_day}
def step1_extract_and_match_work_orders(self):
"""
第一步:匹配生产工单。
对于有生产工单的,直接赋值。
对于没有生产工单的,从备注中通过正则提取潜在的工单号(如 SFC... 或 连续数字)。
并将其更新到 inferred_production_order_no 字段中。
"""
print("[开始] 正在连接数据库并准备清洗工单数据...")
conn = self.get_conn()
cursor = conn.cursor()
# 确保表有 inferred_production_order_no 字段
try:
cursor.execute("ALTER TABLE issue_receipt_details ADD COLUMN inferred_production_order_no TEXT;")
print("[信息] 已在数据库中新建 inferred_production_order_no 字段。")
except sqlite3.OperationalError:
pass # 已经存在
print("[执行] 正在读取所有发料单明细...")
# 获取所有发料单
cursor.execute("SELECT id, production_order_no, work_orders_remark, detailed_remark, production_order_remark FROM issue_receipt_details")
rows = cursor.fetchall()
print(f"[执行] 共读取到 {len(rows)} 条发料记录,开始进行正则比对...")
# 预先加载 abnormal_report 中的有效工单号作为验证库
cursor.execute("SELECT DISTINCT work_orders_number FROM abnormal_report")
valid_sfcs = {r[0].upper() for r in cursor.fetchall() if r[0]}
updates = []
matched_count = 0
for row in rows:
row_id = row[0]
prod_no = row[1]
work_remark = row[2] or ''
detail_remark = row[3] or ''
prod_remark = row[4] or ''
inferred_sfc = ''
# 如果系统本身已经有工单号
if prod_no and prod_no.strip():
inferred_sfc = prod_no.strip()
else:
# 合并所有备注信息
combined_remarks = f"{work_remark} {detail_remark} {prod_remark}"
if combined_remarks.strip():
# 优先匹配完整的 SFC 格式 (例如 SFC018845)
sfc_match = re.search(r'(SFC\d+)', combined_remarks, re.IGNORECASE)
if sfc_match:
inferred_sfc = sfc_match.group(1).upper()
else:
# 尝试匹配纯数字格式 (长度大于等于4位的连续数字)
num_match = re.search(r'(\d{4,})', combined_remarks)
if num_match:
candidate_num = num_match.group(1)
# 尝试拼凑 SFC 前缀并校验是否存在于有效库中
candidate_sfc = f"SFC0{candidate_num}"
if candidate_sfc in valid_sfcs:
inferred_sfc = candidate_sfc
else:
candidate_sfc2 = f"SFC{candidate_num}"
if candidate_sfc2 in valid_sfcs:
inferred_sfc = candidate_sfc2
else:
inferred_sfc = candidate_sfc # 哪怕没在库里也提取出来,作为参考
if inferred_sfc:
matched_count += 1
updates.append((inferred_sfc, row_id))
print(f"[信息] 匹配完成。有 {matched_count} 条数据成功匹配到工单号。正在将结果写回数据库...")
# 批量更新数据库
cursor.executemany(
"UPDATE issue_receipt_details SET inferred_production_order_no = ? WHERE id = ?",
updates
)
conn.commit()
conn.close()
print(f"[完成] 数据库更新完毕!本次清洗共处理了 {len(updates)} 条数据。")
return len(updates)
def step2_get_unmatched_materials(self, start_date=None, end_date=None):
"""
第二步:整理出没有生产工单的物料明细表
"""
conn = self.get_conn()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
where_clause = "WHERE (inferred_production_order_no = '' OR inferred_production_order_no IS NULL)"
params = []
if start_date and end_date:
where_clause += " AND execution_time >= ? AND execution_time <= ?"
params.extend([f"{start_date} 00:00:00", f"{end_date} 23:59:59"])
sql = f"""
SELECT
work_orders_number as issue_receipt_no,
material_code,
material_name,
material_specification,
issue_number,
work_orders_remark,
detailed_remark,
production_order_remark,
execution_time,
executor_user_name,
warehouse_name
FROM issue_receipt_details
{where_clause}
ORDER BY execution_time DESC
"""
cursor.execute(sql, params)
rows = cursor.fetchall()
conn.close()
return [dict(r) for r in rows]
def step3_bom_reconciliation(self, start_date=None, end_date=None, match_type='official'):
"""
第三步BOM 校准物料比对发料与BOM差异
返回按工单分组的树形结构。
"""
conn = self.get_conn()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if match_type == 'official':
where_actual = "WHERE production_order_no != '' AND production_order_no IS NOT NULL"
sfc_field = "production_order_no"
else:
where_actual = "WHERE (production_order_no = '' OR production_order_no IS NULL) AND inferred_production_order_no != '' AND inferred_production_order_no IS NOT NULL"
sfc_field = "inferred_production_order_no"
params_actual = []
if start_date and end_date:
where_actual += " AND execution_time >= ? AND execution_time <= ?"
params_actual.extend([f"{start_date} 00:00:00", f"{end_date} 23:59:59"])
sql_actual = f"""
SELECT
{sfc_field} as sfc,
material_code,
material_name,
SUM(issue_number) as total_actual_issue
FROM issue_receipt_details
{where_actual}
GROUP BY {sfc_field}, material_code
"""
where_bom = ""
params_bom = []
if start_date and end_date:
where_bom = "WHERE order_date >= ? AND order_date <= ?"
params_bom.extend([f"{start_date} 00:00:00", f"{end_date} 23:59:59"])
sql_bom = f"""
SELECT
work_orders_number as sfc,
product_code,
product_name,
material_code,
material_name,
theoretical_issue_qty as bom_qty
FROM abnormal_report
{where_bom}
"""
cursor.execute(sql_actual, params_actual)
actuals = cursor.fetchall()
cursor.execute(sql_bom, params_bom)
boms = cursor.fetchall()
sfc_to_product = {}
sfc_bom_dict = {} # {sfc: {material_code: {qty, name}}}
for r in boms:
sfc = r['sfc']
mat_code = r['material_code']
if sfc not in sfc_bom_dict:
sfc_bom_dict[sfc] = {}
sfc_bom_dict[sfc][mat_code] = {
'bom_qty': r['bom_qty'] or 0,
'material_name': r['material_name']
}
if r['product_code']:
sfc_to_product[sfc] = {
'product_code': r['product_code'],
'product_name': r['product_name']
}
sfc_actual_dict = {} # {sfc: {material_code: {qty, name}}}
for r in actuals:
sfc = r['sfc']
mat_code = r['material_code']
if sfc not in sfc_actual_dict:
sfc_actual_dict[sfc] = {}
sfc_actual_dict[sfc][mat_code] = {
'actual_qty': r['total_actual_issue'] or 0,
'material_name': r['material_name']
}
if match_type == 'official':
all_sfcs = set(sfc_bom_dict.keys()).union(set(sfc_actual_dict.keys()))
else:
all_sfcs = set(sfc_actual_dict.keys())
# 预先获取所有的 product_code 的 bom_child 树结构
product_codes = [p['product_code'] for p in sfc_to_product.values() if p['product_code']]
bom_trees = {} # {product_code: list of root children}
if product_codes:
unique_p_codes = list(set(product_codes))
placeholders = ','.join(['?'] * len(unique_p_codes))
cursor.execute(f"""
SELECT id, parent_material_code, node_material_code, node_material_name, bom_level, parent_node_id, usage_qty
FROM bom_child
WHERE parent_material_code IN ({placeholders})
""", unique_p_codes)
all_bom_children = cursor.fetchall()
from collections import defaultdict
child_by_product = defaultdict(list)
for row in all_bom_children:
child_by_product[row['parent_material_code']].append(dict(row))
for p_code, children in child_by_product.items():
nodes_map = {}
for row in children:
nodes_map[row['id']] = {
'id': f"node_{row['id']}",
'material_code': row['node_material_code'],
'material_name': row['node_material_name'],
'bom_level': row['bom_level'],
'parent_node_id': row['parent_node_id'],
'bom_qty': 0,
'actual_qty': 0,
'diff_qty': 0,
'status': '',
'children': []
}
root_children = []
for row in children:
node_id = row['id']
parent_id = row['parent_node_id']
if parent_id is None:
root_children.append(nodes_map[node_id])
else:
if parent_id in nodes_map:
nodes_map[parent_id]['children'].append(nodes_map[node_id])
bom_trees[p_code] = root_children
conn.close()
result = []
for sfc in all_sfcs:
prod_info = sfc_to_product.get(sfc, {})
p_code = prod_info.get('product_code', '')
p_name = prod_info.get('product_name', '无工单关联产品')
sfc_node = {
'id': f"sfc_{sfc}",
'sfc': sfc,
'material_code': p_code,
'material_name': f"【成品】{p_name}",
'bom_qty': '',
'actual_qty': '',
'diff_qty': '',
'status': '',
'children': []
}
boms_for_sfc = sfc_bom_dict.get(sfc, {})
actuals_for_sfc = sfc_actual_dict.get(sfc, {})
processed_materials = set()
import copy
if p_code in bom_trees:
tree_clone = copy.deepcopy(bom_trees[p_code])
def fill_tree_qty(nodes, sfc_id):
for idx, node in enumerate(nodes):
node['id'] = f"{sfc_id}_{node['id']}_{idx}" # 确保前端树节点的唯一性
m_code = node['material_code']
bom_info = boms_for_sfc.get(m_code, {})
act_info = actuals_for_sfc.get(m_code, {})
node['bom_qty'] = bom_info.get('bom_qty', 0)
node['actual_qty'] = act_info.get('actual_qty', 0)
node['diff_qty'] = round(node['actual_qty'] - node['bom_qty'], 4)
if node['diff_qty'] > 0 and node['bom_qty'] > 0:
node['status'] = '超领发料'
elif node['diff_qty'] < 0:
node['status'] = '少领发料'
elif node['diff_qty'] == 0 and node['bom_qty'] > 0:
node['status'] = '发料正常'
elif node['bom_qty'] == 0 and node['actual_qty'] > 0:
node['status'] = 'BOM外发料'
elif node['bom_qty'] > 0 and node['actual_qty'] == 0:
node['status'] = '未发料'
else:
node['status'] = '无发料任务'
processed_materials.add(m_code)
fill_tree_qty(node['children'], sfc_id)
fill_tree_qty(tree_clone, sfc)
sfc_node['children'].extend(tree_clone)
all_m_codes_for_sfc = set(boms_for_sfc.keys()).union(set(actuals_for_sfc.keys()))
extra_materials = all_m_codes_for_sfc - processed_materials
# 将多出的(或者没有树结构的)物料直接作为工单的子节点
for i, m_code in enumerate(extra_materials):
bom_info = boms_for_sfc.get(m_code, {})
act_info = actuals_for_sfc.get(m_code, {})
bom_q = bom_info.get('bom_qty', 0)
act_q = act_info.get('actual_qty', 0)
m_name = bom_info.get('material_name') or act_info.get('material_name') or '未知名称'
diff_q = round(act_q - bom_q, 4)
status = ''
if diff_q > 0 and bom_q > 0:
status = '超领发料'
elif diff_q < 0:
status = '少领发料'
elif diff_q == 0 and bom_q > 0:
status = '发料正常'
elif bom_q == 0 and act_q > 0:
status = 'BOM外发料'
elif bom_q > 0 and act_q == 0:
status = '未发料'
sfc_node['children'].append({
'id': f"extra_{sfc}_{m_code}_{i}",
'sfc': '',
'material_code': m_code,
'material_name': m_name,
'bom_qty': bom_q,
'actual_qty': act_q,
'diff_qty': diff_q,
'status': status,
'children': []
})
result.append(sfc_node)
# 根据工单号倒序排列
result.sort(key=lambda x: x['sfc'], reverse=True)
return result
if __name__ == '__main__':
service = MaterialReconciliationService()
print("Executing Step 1: Matching work orders...")
updated = service.step1_extract_and_match_work_orders()
print(f"Updated {updated} issue receipts with inferred production orders.")
unmatched = service.step2_get_unmatched_materials()
print(f"Found {len(unmatched)} unmatched materials.")
recon = service.step3_bom_reconciliation()
abnormal = [r for r in recon if r['status'] in ['超领发料', 'BOM外发料']]
print(f"Found {len(abnormal)} abnormal BOM issues.")