Files
datie-bom/browser_login/import_to_sqlite.py
2026-06-22 16:48:54 +08:00

522 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
import json
from pathlib import Path
import os
from config import OUTPUT_DIR, DB_PATH
RECEIPT_JSON = OUTPUT_DIR / "receipt_details_full_clean.json"
BOM_JSON = OUTPUT_DIR / "bom_cost_full_tree_final.json"
ISSUE_JSON = OUTPUT_DIR / "issue_receipt_details_full.json"
def init_db():
"""初始化数据库并创建表"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建收货明细表
cursor.execute('''
CREATE TABLE IF NOT EXISTS receipt_details (
id INTEGER PRIMARY KEY AUTOINCREMENT,
purchase_order_code TEXT,
row_no INTEGER,
material_code TEXT,
material_name TEXT,
material_specification TEXT,
warehouse_code TEXT,
warehouse_name TEXT,
supplier_code TEXT,
supplier_name TEXT,
unit_name TEXT,
conversion_unit TEXT,
receive_price REAL,
receipt_time TEXT,
purchase_qty REAL,
receive_qty REAL,
total_amount REAL
)
''')
# 为收货明细表创建索引以加速查询
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_material_code ON receipt_details(material_code)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_supplier_name ON receipt_details(supplier_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_receipt_time ON receipt_details(receipt_time)')
# 创建发料明细表
cursor.execute('''
CREATE TABLE IF NOT EXISTS issue_receipt_details (
id INTEGER PRIMARY KEY AUTOINCREMENT,
production_order_no TEXT,
product_material_code TEXT,
product_material_name TEXT,
product_material_specification TEXT,
work_orders_number TEXT,
status INTEGER,
material_specification TEXT,
material_name TEXT,
material_code TEXT,
issue_number REAL,
has_issue_number REAL,
amount REAL,
cost_price REAL,
issue_amount REAL,
production_order_remark TEXT,
detailed_remark TEXT,
unit_name TEXT,
warehouse_name TEXT,
line_number INTEGER,
work_orders_remark TEXT,
executor_user_name TEXT,
material_model TEXT,
execution_time TEXT,
materials_user_name TEXT,
product_material_model TEXT,
custom_field TEXT,
department_information_code TEXT,
department_information_name TEXT,
image_file TEXT,
issue_amount_total REAL,
material_group_code TEXT,
material_group_name TEXT,
numnber_of_reserved_digits INTEGER,
place_ment_strategy INTEGER,
price REAL,
sales_order_code TEXT
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_issue_material_code ON issue_receipt_details(material_code)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_issue_execution_time ON issue_receipt_details(execution_time)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_issue_work_orders_number ON issue_receipt_details(work_orders_number)')
# 删除历史可能存在的重复数据,确保唯一索引能够成功创建
cursor.execute('''
DELETE FROM issue_receipt_details
WHERE id NOT IN (
SELECT MIN(id)
FROM issue_receipt_details
GROUP BY work_orders_number, line_number, material_code
)
''')
# 为发料明细表创建唯一索引,用于 UPSERT 冲突检测
cursor.execute('''
CREATE UNIQUE INDEX IF NOT EXISTS idx_issue_unique
ON issue_receipt_details(work_orders_number, line_number, material_code)
''')
# 注意:为了在打包部署时不丢失用户已抓取的数据,改为 IF NOT EXISTS
cursor.execute('''
CREATE TABLE IF NOT EXISTS bom_parent (
id INTEGER PRIMARY KEY AUTOINCREMENT,
parent_material_code TEXT UNIQUE,
parent_material_name TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS bom_child (
id INTEGER PRIMARY KEY AUTOINCREMENT,
parent_material_code TEXT, -- 归属的最顶层父件
node_material_code TEXT,
node_material_name TEXT,
bom_level INTEGER,
parent_node_id INTEGER, -- 指向上一级子件的 id如果是一级子件则为空
usage_qty REAL DEFAULT 1.0,
FOREIGN KEY(parent_material_code) REFERENCES bom_parent(parent_material_code),
FOREIGN KEY(parent_node_id) REFERENCES bom_child(id)
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bom_child_parent_code ON bom_child(parent_material_code)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bom_child_node_code ON bom_child(node_material_code)')
# 创建生产工单表
cursor.execute('''
CREATE TABLE IF NOT EXISTS work_orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
work_orders_number TEXT,
line_number INTEGER,
material_code TEXT,
material_name TEXT,
material_specification TEXT,
status INTEGER,
unit_name TEXT,
cost_price REAL,
issue_number REAL,
total_issue_number REAL,
issue_amount REAL,
issue_amount_total REAL,
executor_user_name TEXT,
execution_time TEXT,
production_order_no TEXT,
warehouse_name TEXT,
materials_user_name TEXT,
work_orders_remark TEXT
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_work_orders_number ON work_orders(work_orders_number)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_work_orders_material_code ON work_orders(material_code)')
# 删除可能存在的重复数据
cursor.execute('''
DELETE FROM work_orders
WHERE id NOT IN (
SELECT MIN(id)
FROM work_orders
GROUP BY work_orders_number, line_number, material_code
)
''')
# 唯一索引
cursor.execute('''
CREATE UNIQUE INDEX IF NOT EXISTS idx_work_orders_unique
ON work_orders(work_orders_number, line_number, material_code)
''')
# 创建发料异常报表表
cursor.execute('''
CREATE TABLE IF NOT EXISTS abnormal_report (
id INTEGER PRIMARY KEY AUTOINCREMENT,
work_orders_number TEXT,
product_code TEXT,
product_name TEXT,
status TEXT,
completed_qty REAL,
order_date TEXT,
workshop TEXT,
material_code TEXT,
material_name TEXT,
material_specification TEXT,
unit_qty REAL,
total_demand_qty REAL,
warehouse_issue_qty REAL,
theoretical_issue_qty REAL,
issue_method TEXT,
issue_status TEXT
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_abnormal_work_orders_number ON abnormal_report(work_orders_number)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_abnormal_material_code ON abnormal_report(material_code)')
cursor.execute('''
CREATE UNIQUE INDEX IF NOT EXISTS idx_abnormal_unique
ON abnormal_report(work_orders_number, material_code)
''')
conn.commit()
return conn
def import_receipt_details(conn):
"""导入收货明细数据"""
if not RECEIPT_JSON.exists():
print(f"找不到收货明细文件: {RECEIPT_JSON}")
return
print("开始导入收货明细数据...")
with open(RECEIPT_JSON, 'r', encoding='utf-8') as f:
data = json.load(f)
cursor = conn.cursor()
# 删除历史可能存在的重复数据,确保唯一索引能够成功创建
cursor.execute('''
DELETE FROM receipt_details
WHERE id NOT IN (
SELECT MIN(id)
FROM receipt_details
GROUP BY purchase_order_code, row_no, material_code
)
''')
# 为了避免没有唯一索引导致 UPSERT 报错,这里显式创建一次
cursor.execute('''
CREATE UNIQUE INDEX IF NOT EXISTS idx_receipt_unique
ON receipt_details(purchase_order_code, row_no, material_code)
''')
count_inserted = 0
for item in data:
p_qty = item.get("进货数量")
r_qty = item.get("收货数量")
po_code = item.get("采购订单号")
row_no = item.get("行号")
mat_code = item.get("物料代码")
# 容错:如果关键字段为空则跳过
if not po_code or not row_no or not mat_code:
continue
try:
cursor.execute('''
INSERT INTO receipt_details (
purchase_order_code, row_no, material_code, material_name,
material_specification, warehouse_code, warehouse_name,
supplier_code, supplier_name, unit_name, conversion_unit,
receive_price, receipt_time,
purchase_qty, receive_qty, total_amount
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(purchase_order_code, row_no, material_code) DO UPDATE SET
receive_price=excluded.receive_price,
receive_qty=excluded.receive_qty,
total_amount=excluded.total_amount,
receipt_time=excluded.receipt_time
''', (
po_code,
row_no,
mat_code,
item.get("物料名称"),
item.get("物料规格"),
item.get("仓库代码"),
item.get("仓库名称"),
item.get("供应商代码"),
item.get("供应商名称"),
item.get("单位名称"),
item.get("转换单位"),
item.get("收货单价"),
item.get("收货时间"),
p_qty,
r_qty,
item.get("收货总金额")
))
count_inserted += 1
except sqlite3.Error as e:
print(f"入库报错 收货单:{po_code} 行号:{row_no} 错误:{e}")
conn.commit()
print(f"成功处理(新增或更新) {count_inserted} 条收货明细数据!")
def _insert_bom_tree(cursor, parent_material_code, tree_nodes, parent_node_id=None):
"""递归插入 BOM 树节点"""
for node in tree_nodes:
# 提取当前节点信息
node_code = node.get("childMaterialCode")
node_name = node.get("childMaterialName")
bom_level = node.get("bomLevel")
usage_qty = float(node.get("usageQty") or 1.0)
# 插入当前节点
cursor.execute('''
INSERT INTO bom_child (
parent_material_code, node_material_code, node_material_name, bom_level, parent_node_id, usage_qty
) VALUES (?, ?, ?, ?, ?, ?)
''', (parent_material_code, node_code, node_name, bom_level, parent_node_id, usage_qty))
# 获取刚插入的节点 ID作为其子节点的 parent_node_id
current_node_id = cursor.lastrowid
# 如果有子节点,递归插入
sub_items = node.get("sub_items", [])
if sub_items:
_insert_bom_tree(cursor, parent_material_code, sub_items, current_node_id)
def import_bom_data(conn):
"""导入 BOM 成本树状数据"""
if not BOM_JSON.exists():
print(f"找不到 BOM 成本文件: {BOM_JSON}")
return
print("开始导入 BOM 成本数据...")
with open(BOM_JSON, 'r', encoding='utf-8') as f:
data = json.load(f)
cursor = conn.cursor()
# 清空旧数据
cursor.execute('DELETE FROM bom_child')
cursor.execute('DELETE FROM bom_parent')
parent_count = 0
for parent in data:
parent_code = parent.get("parentMaterialCode")
parent_name = parent.get("parentMaterialName")
# 忽略空父件
if not parent_code:
continue
try:
cursor.execute('''
INSERT INTO bom_parent (parent_material_code, parent_material_name)
VALUES (?, ?)
''', (parent_code, parent_name))
parent_count += 1
# 递归处理这棵树
tree = parent.get("bom_cost_tree", [])
if tree:
_insert_bom_tree(cursor, parent_code, tree, parent_node_id=None)
except sqlite3.IntegrityError:
print(f"警告: 父件重复 {parent_code},跳过")
conn.commit()
# 统计插入的子件数量
cursor.execute('SELECT COUNT(*) FROM bom_child')
child_count = cursor.fetchone()[0]
print(f"成功导入 {parent_count} 个 BOM 父件,包含 {child_count} 个子件节点!")
def import_issue_receipt_details(conn):
"""导入发料明细数据"""
if not ISSUE_JSON.exists():
print(f"找不到发料明细文件: {ISSUE_JSON}")
return
print("开始导入发料明细数据...")
with open(ISSUE_JSON, 'r', encoding='utf-8') as f:
data = json.load(f)
cursor = conn.cursor()
# 为了避免没有唯一索引导致 UPSERT 报错,这里显式创建一次
cursor.execute('''
CREATE UNIQUE INDEX IF NOT EXISTS idx_issue_unique
ON issue_receipt_details(work_orders_number, line_number, material_code)
''')
count_inserted = 0
count_updated = 0
for item in data:
# 提取关键字段
wo_number = item.get("发料单号")
line_no = item.get("行号")
mat_code = item.get("物料代码")
# 容错:如果关键字段为空则跳过
if not wo_number or not line_no or not mat_code:
continue
remark = item.get("明细备注") or ""
p_remark = item.get("生产订单备注") or ""
inferred_sfc = ""
# 尝试从备注中提取 SFC 编号
if '' in remark and '' in remark:
start = remark.find('') + 1
end = remark.find('')
sfc_candidate = remark[start:end].strip()
if sfc_candidate.startswith('SFC'):
inferred_sfc = sfc_candidate
if '' in p_remark and '' in p_remark:
start = p_remark.find('') + 1
end = p_remark.find('')
sfc_candidate = p_remark[start:end].strip()
if sfc_candidate.startswith('SFC'):
inferred_sfc = sfc_candidate
try:
cursor.execute('''
INSERT INTO issue_receipt_details (
production_order_no, product_material_code, product_material_name, product_material_specification,
work_orders_number, status, material_specification, material_name, material_code,
issue_number, has_issue_number, amount, cost_price, issue_amount,
production_order_remark, detailed_remark, unit_name, warehouse_name, line_number,
work_orders_remark, executor_user_name, material_model, execution_time, materials_user_name,
product_material_model, custom_field, department_information_code, department_information_name,
image_file, issue_amount_total, material_group_code, material_group_name,
numnber_of_reserved_digits, place_ment_strategy, price, sales_order_code
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
ON CONFLICT(work_orders_number, line_number, material_code) DO UPDATE SET
status=excluded.status,
issue_number=excluded.issue_number,
has_issue_number=excluded.has_issue_number,
amount=excluded.amount,
cost_price=excluded.cost_price,
issue_amount=excluded.issue_amount,
warehouse_name=excluded.warehouse_name,
executor_user_name=excluded.executor_user_name,
execution_time=excluded.execution_time,
materials_user_name=excluded.materials_user_name,
issue_amount_total=excluded.issue_amount_total
''', (
item.get("生产任务单号"), item.get("生产物料代码"), item.get("生产物料名称"), item.get("生产物料规格"),
wo_number, item.get("状态"), item.get("物料规格"), item.get("物料名称"), mat_code,
item.get("发料数量"), item.get("已发料数量"), item.get("金额"), item.get("成本价"), item.get("发料金额"),
item.get("生产订单备注"), item.get("明细备注"), item.get("单位名称"), item.get("仓库名称"), line_no,
item.get("发料单备注"), item.get("执行人名称"), item.get("物料型号"), item.get("执行时间"), item.get("领料人"),
item.get("生产物料型号"), item.get("自定义字段"), item.get("部门代码"), item.get("部门名称"),
item.get("图片文件"), item.get("汇总金额"), item.get("物料组代码"), item.get("物料组名称"),
item.get("单价小数位数"), item.get("单价进位策略"), item.get("单价"), item.get("销售订单号")
))
# 由于 sqlite3 在 UPSERT 时,如果发生了 UPDATErowcount 也是 1
# 但我们可以通过比较变化前后的总数来粗略估计,或者统一提示处理成功
count_inserted += 1
except sqlite3.Error as e:
print(f"入库报错 发料单:{wo_number} 行号:{line_no} 错误:{e}")
conn.commit()
print(f"成功处理(新增或更新) {count_inserted} 条发料明细数据!")
def import_abnormal_report_data(items):
"""直接将 API 获取到的异常报表 items 数组存入数据库"""
conn = init_db()
cursor = conn.cursor()
count_inserted = 0
for item in items:
wo_number = item.get("生产工单号")
mat_code = item.get("需求物料代码")
if not wo_number or not mat_code:
continue
try:
cursor.execute('''
INSERT INTO abnormal_report (
work_orders_number, product_code, product_name, status, completed_qty,
order_date, workshop, material_code, material_name, material_specification,
unit_qty, total_demand_qty, warehouse_issue_qty, theoretical_issue_qty,
issue_method, issue_status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(work_orders_number, material_code) DO UPDATE SET
status=excluded.status,
completed_qty=excluded.completed_qty,
warehouse_issue_qty=excluded.warehouse_issue_qty,
theoretical_issue_qty=excluded.theoretical_issue_qty,
issue_status=excluded.issue_status
''', (
wo_number, item.get("产品代码"), item.get("产品名称"), item.get("工单状态"),
item.get("已完工数量"), item.get("下单日期"), item.get("生产车间"),
mat_code, item.get("需求物料名称"), item.get("需求物料规格"),
item.get("单机用量"), item.get("需求总量"), item.get("仓库发放数量"),
item.get("理论仓库出料数量"), item.get("发料方式"), item.get("发料情况")
))
count_inserted += 1
except sqlite3.Error as e:
print(f"异常报表入库报错 工单:{wo_number} 物料:{mat_code} 错误:{e}")
conn.commit()
conn.close()
return count_inserted
if __name__ == "__main__":
import sys
print(f"数据库文件将保存在: {DB_PATH}")
conn = init_db()
# 允许通过命令行参数单独导入某一部分数据
args = sys.argv[1:]
if "--bom-only" in args:
import_bom_data(conn)
elif "--receipt-only" in args:
import_receipt_details(conn)
elif "--issue-only" in args:
import_issue_receipt_details(conn)
else:
# 默认全量导入
import_receipt_details(conn)
import_bom_data(conn)
import_issue_receipt_details(conn)
conn.close()
print("全部导入完成!你可以使用 SQLite 客户端连接 erp_data.db 查看数据。")