抓取生产工单,抓取发料异常

This commit is contained in:
hjq
2026-06-12 11:09:15 +08:00
parent 559ead8300
commit c5232bccc1
3 changed files with 208 additions and 154 deletions

View File

@@ -13,7 +13,7 @@ from login import get_page, log
from config import OUTPUT_DIR
HOME_URL = "https://yunmes.tftykj.cn/"
API_TARGET = "ReceiptDetailsCheck_SearchList_Proxy"
API_TARGET = "ReceiptDetailsCheckFinace_SearchList"
SAVE_PATH = OUTPUT_DIR / "receipt_details_full_clean.json"
def fetch_receipt_details_full():
@@ -32,53 +32,76 @@ def fetch_receipt_details_full():
all_clean_items = []
try:
log("INFO", f"正在回到主页起点: {HOME_URL}")
page.get(HOME_URL)
TARGET_URL = "https://yunmes.tftykj.cn/ReceiptDetailsCheckFinace"
log("INFO", f"正在直接访问目标页面: {TARGET_URL}")
page.get(TARGET_URL)
page.wait.load_start()
time.sleep(2)
menus = [
("第一层: 业务统计报表", 'xpath://*[@id="app"]/div/div[1]/div[1]/div[2]/div/div[1]/div/div[10]/div/p'),
("第二层: 财务业务报表", 'text:财务业务报表'),
("第三层: 财务收货明细报表", 'text:财务收货明细报表')
]
log("INFO", "开始模拟人工点击左侧导航菜单...")
for name, xpath in menus:
ele = page.ele(xpath, timeout=5)
if ele:
try: ele.click()
except: page.run_js("arguments[0].click();", ele)
time.sleep(1.5)
else:
log("ERR", f"找不到菜单元素: {name}")
return
log("OK", "✅ 成功点开收货明细报表界面!")
# 点击空白处隐藏菜单
blank_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[1]/div[2]/div[2]/div/div[1]/div'
blank_ele = page.ele(blank_xpath, timeout=3)
if blank_ele:
try: blank_ele.click()
except: page.run_js("arguments[0].click();", blank_ele)
time.sleep(0.5)
# 等待数据表格区域出现
table = page.ele("xpath://table | .el-table__body", timeout=15)
if table:
log("OK", "✅ 成功打开财务收货明细报表界面!")
else:
log("WARN", "表格元素未找到,继续执行")
log("INFO", f"开启底层数据拦截网: {API_TARGET}")
page.listen.start(API_TARGET)
packet = page.listen.wait(timeout=10)
# 为了能够获取当月的数据,强制设置时间为当月第一天到最后一天,并清理其他条件
import datetime, calendar
now = datetime.datetime.now()
first_day = datetime.date(now.year, now.month, 1).strftime('%Y-%m-%d')
last_day = datetime.date(now.year, now.month, calendar.monthrange(now.year, now.month)[1]).strftime('%Y-%m-%d')
if not packet:
log("INFO", "尝试寻找并点击页面上的【查询】按钮...")
query_btn_xpath = 'xpath://*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/div[1]/div/button[1]/span'
query_btn = page.ele(query_btn_xpath, timeout=3)
log("INFO", f"正在自动设置查询时间范围: {first_day}{last_day}")
page.run_js(f"""
try {{
var dates = document.querySelectorAll('.datebox-f, .datetimebox-f, .el-date-editor input');
if (dates.length >= 2) {{
dates[0].value = '{first_day}';
dates[1].value = '{last_day}';
dates[0].dispatchEvent(new Event('input', {{ bubbles: true }}));
dates[0].dispatchEvent(new Event('change', {{ bubbles: true }}));
dates[1].dispatchEvent(new Event('input', {{ bubbles: true }}));
dates[1].dispatchEvent(new Event('change', {{ bubbles: true }}));
}}
}} catch(e) {{ console.log(e); }}
""")
time.sleep(1)
# 寻找并点击页面上的【查询】按钮
log("INFO", "尝试寻找并点击页面上的【查询】按钮...")
# 使用 DrissionPage 内置选择器尝试寻找
query_btn = page.ele('text=查询', timeout=3)
if not query_btn:
query_btn = page.ele('xpath://button[contains(., "查询")]', timeout=3)
if query_btn:
try: query_btn.click()
except: page.run_js("arguments[0].click();", query_btn)
packet = page.listen.wait(timeout=15)
if query_btn:
try: query_btn.click()
except: page.run_js("arguments[0].click();", query_btn)
else:
log("WARN", "常规选择器找不到查询按钮,尝试使用全局 JS 强行寻找...")
# 暴力兜底:通过 JS 遍历所有按钮和链接点击
clicked = page.run_js("""
var btns = document.querySelectorAll('button, a, .l-btn, .el-button');
for(var i=0; i<btns.length; i++) {
if(btns[i].innerText && btns[i].innerText.indexOf('查询') !== -1) {
btns[i].click();
return true;
}
}
return false;
""")
if not clicked:
log("ERR", "找不到查询按钮!")
page.listen.stop()
return
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", "未能拦截到第一页数据,可能网络超时或查询未触发。")
page.listen.stop()
@@ -91,8 +114,12 @@ def fetch_receipt_details_full():
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
# 设定开始抓取的页码,1表示从头开始抓全量数据
target_resume_page = 690
# 设定开始抓取的页码,根据已有数据量动态计算假设每页50条
target_resume_page = 1
if len(all_clean_items) > 0:
target_resume_page = max(1, len(all_clean_items) // 50)
# 截断已有数据,防止与即将重新抓取的页数重叠导致重复
all_clean_items = all_clean_items[:(target_resume_page - 1) * 50]
total_count = 0
if isinstance(data, dict) and "result" in data: