Files
datie-bom/browser_login/fetch_basis_quality_incremental.py

148 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
质量报表 (Basis Quality Report) - 时间窗口滑动增量抓取
目标: 采用底层请求拦截与篡改技术,强行指定“下单日期(开始)”为特定的时间窗口,抓取数据。
"""
import sys
import json
import time
import random
import urllib.parse
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from login import get_page, log
from config import OUTPUT_DIR
HOME_URL = "https://yunmes.tftykj.cn/"
API_TARGET = "SearchCustomReportBySQL_Proxy"
SAVE_PATH = OUTPUT_DIR / "basis_quality_incremental.json"
def fetch_basis_quality_incremental():
# 动态计算时间窗口(使用内置的 timedelta 计算过去 90 天,避免依赖外部库)
end_date = datetime.now()
start_date = end_date - timedelta(days=90)
start_date_str = start_date.strftime("%Y-%m-%d 00:00:00")
end_date_str = end_date.strftime("%Y-%m-%d 23:59:59")
# URL 编码
encoded_start = urllib.parse.quote(start_date_str)
encoded_end = urllib.parse.quote(end_date_str)
log("INFO", f"=== 🚀 启动质量报表 - 时间滑动增量抓取 ===")
log("INFO", f"📅 设定的滑动窗口: {start_date_str} -> {end_date_str}")
page = get_page(port=9222)
all_clean_items = []
try:
log("INFO", f"正在回到主页起点: {HOME_URL}")
page.get(HOME_URL)
page.wait.load_start()
time.sleep(2)
menus = [
("进入质量报表", 'xpath://*[@id="el-collapse-content-21"]/div/div/div/div[1]/div/div/div[6]/div')
]
log("INFO", "开始模拟人工点击左侧导航菜单...")
for name, xpath in menus:
ele = page.ele(xpath, timeout=5)
if ele:
try: ele.click()
except: page.run_js("arguments[0].click();", ele)
else:
log("ERR", f"找不到菜单元素: {name}")
return
log("OK", "✅ 成功点开质量报表界面!")
time.sleep(2)
# 开启普通的数据监听
log("INFO", f"开启底层拦截网: {API_TARGET}")
page.listen.start(API_TARGET)
# =========================================================
# 循环翻页抓取逻辑 (测试模式:仅抓取前 3 页)
# =========================================================
current_page = 1
query_btn_xpath = 'xpath://*[@id="customTable-search-area"]/div[1]/div/div[1]/a[2]/span/span'
while current_page <= 3: # 限制只抓取前 3 页用于测试
# 1. 因为我们无法用 DrissionPage 的 listen 修改发送出去的 POST Data
# 我们直接在 Python 层发送一个 JS Fetch 请求,完全模拟原有的请求,但带上我们自己构造的 Payload
log("INFO", f"正在通过底层 JS Fetch 强行注入带时间窗口的请求... (页码: {current_page})")
# 注意:这里的 new_payload 必须转义所有的单双引号以适配 JS 字符串拼接
base_payload = f"page={current_page}&rows=50&id=80&sqlFilter%5BfieldList%5D%5B0%5D%5Bid%5D=17647&sqlFilter%5BfieldList%5D%5B0%5D%5Bfield%5D=%E4%B8%8B%E5%8D%95%E6%97%A5%E6%9C%9F(%E7%BB%93%E6%9D%9F)&sqlFilter%5BfieldList%5D%5B0%5D%5BfieldTranslate%5D=%5B%E4%B8%8B%E5%8D%95%E6%97%A5%E6%9C%9F(%E7%BB%93%E6%9D%9F)%5D&sqlFilter%5BfieldList%5D%5B0%5D%5BstartValue%5D={encoded_end}&sqlFilter%5BfieldList%5D%5B0%5D%5BendValue%5D=&sqlFilter%5BfieldList%5D%5B0%5D%5BcompareEnum%5D=0&sqlFilter%5BfieldList%5D%5B0%5D%5BfieldDataType%5D=2&sqlFilter%5BfieldList%5D%5B0%5D%5BorderNumber%5D=&sqlFilter%5BfieldList%5D%5B0%5D%5BorderType%5D=0&sqlFilter%5BfieldList%5D%5B0%5D%5BisTimeLimit%5D=false&sqlFilter%5BfieldList%5D%5B0%5D%5BlimitLength%5D=0&sqlFilter%5BfieldList%5D%5B0%5D%5BdateType%5D=1&sqlFilter%5BfieldList%5D%5B0%5D%5BdateDefaultType%5D=0&sqlFilter%5BfieldList%5D%5B0%5D%5BisSqlField%5D=false&sqlFilter%5BfieldList%5D%5B0%5D%5Bcondition%5D=0&sqlFilter%5BfieldList%5D%5B0%5D%5BgetValue%5D=&sqlFilter%5BfieldList%5D%5B0%5D%5BbackgroundColor%5D=&sqlFilter%5BfieldList%5D%5B0%5D%5BfontColor%5D=&sqlFilter%5BfieldList%5D%5B0%5D%5BisSeachParam%5D=true&sqlFilter%5BfieldList%5D%5B0%5D%5BdefaultValue%5D=&sqlFilter%5BfieldList%5D%5B0%5D%5Bwidth%5D=&sqlFilter%5BfieldList%5D%5B0%5D%5BdefaultTime%5D=&sqlFilter%5BfieldList%5D%5B0%5D%5BsearchParamEnableVal%5D=0&sqlFilter%5BfieldList%5D%5B0%5D%5BoptionMode%5D=0&sqlFilter%5BfieldList%5D%5B1%5D%5Bid%5D=17646&sqlFilter%5BfieldList%5D%5B1%5D%5Bfield%5D=%E4%B8%8B%E5%8D%95%E6%97%A5%E6%9C%9F(%E5%BC%80%E5%A7%8B)&sqlFilter%5BfieldList%5D%5B1%5D%5BfieldTranslate%5D=%5B%E4%B8%8B%E5%8D%95%E6%97%A5%E6%9C%9F(%E5%BC%80%E5%A7%8B)%5D&sqlFilter%5BfieldList%5D%5B1%5D%5BstartValue%5D={encoded_start}&sqlFilter%5BfieldList%5D%5B1%5D%5BendValue%5D=&sqlFilter%5BfieldList%5D%5B1%5D%5BcompareEnum%5D=0&sqlFilter%5BfieldList%5D%5B1%5D%5BfieldDataType%5D=2&sqlFilter%5BfieldList%5D%5B1%5D%5BorderNumber%5D=&sqlFilter%5BfieldList%5D%5B1%5D%5BorderType%5D=0&sqlFilter%5BfieldList%5D%5B1%5D%5BisTimeLimit%5D=false&sqlFilter%5BfieldList%5D%5B1%5D%5BlimitLength%5D=0&sqlFilter%5BfieldList%5D%5B1%5D%5BdateType%5D=1&sqlFilter%5BfieldList%5D%5B1%5D%5BdateDefaultType%5D=0&sqlFilter%5BfieldList%5D%5B1%5D%5BisSqlField%5D=false&sqlFilter%5BfieldList%5D%5B1%5D%5Bcondition%5D=0&sqlFilter%5BfieldList%5D%5B1%5D%5BgetValue%5D=&sqlFilter%5BfieldList%5D%5B1%5D%5BbackgroundColor%5D=&sqlFilter%5BfieldList%5D%5B1%5D%5BfontColor%5D=&sqlFilter%5BfieldList%5D%5B1%5D%5BisSeachParam%5D=true&sqlFilter%5BfieldList%5D%5B1%5D%5BdefaultValue%5D=&sqlFilter%5BfieldList%5D%5B1%5D%5Bwidth%5D=&sqlFilter%5BfieldList%5D%5B1%5D%5BdefaultTime%5D=&sqlFilter%5BfieldList%5D%5B1%5D%5BsearchParamEnableVal%5D=1&sqlFilter%5BfieldList%5D%5B1%5D%5BoptionMode%5D=0&isAll=false"
# 强行在页面中注入一个 Fetch 请求。由于在页面上下文中运行,它会自动带上所有的 Cookies 和 Auth Token
fetch_js = f"""
fetch('/api/services/TfTechApi/SQLSolution/SearchCustomReportBySQL_Proxy', {{
method: 'POST',
headers: {{
'accept': 'application/json, text/javascript, */*; q=0.01',
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'x-requested-with': 'XMLHttpRequest'
}},
body: '{base_payload}'
}});
"""
page.run_js(fetch_js)
# 2. 等待我们注入的请求响应
packet = page.listen.wait(timeout=15)
if not packet:
log("ERR", f"{current_page} 页注入请求超时或未触发,中止抓取。")
break
# 3. 解析数据
body = packet.response.body
data = body if isinstance(body, (dict, list)) else json.loads(body)
if isinstance(data, dict) and "result" in data:
# 检查 result 是否是字典,如果直接是列表则直接取用
if isinstance(data["result"], dict):
items = data["result"].get("items", [])
elif isinstance(data["result"], list):
items = data["result"]
else:
items = []
if not items:
log("WARN", f"{current_page} 页返回了空列表,可能该时间段内无数据。")
break
for item in items:
all_clean_items.append(item)
log("OK", f"{current_page} 页清洗完成,累计提取 {len(all_clean_items)} 条数据。")
if current_page % 10 == 0:
with open(SAVE_PATH, "w", encoding="utf-8") as f:
json.dump(all_clean_items, f, ensure_ascii=False, indent=2)
else:
log("ERR", f"{current_page} 页数据结构异常,中止。")
break
current_page += 1
# 最终保存
if all_clean_items:
with open(SAVE_PATH, "w", encoding="utf-8") as f:
json.dump(all_clean_items, f, ensure_ascii=False, indent=2)
log("OK", f"🎉 抓取完成!总计成功提取 {len(all_clean_items)} 条数据。")
log("OK", f"数据已保存至: {SAVE_PATH}")
except Exception as e:
log("ERR", f"发生全局异常: {e}")
finally:
try:
page.listen.stop()
log("INFO", "🛑 已释放浏览器监听资源。")
except:
pass
if __name__ == "__main__":
fetch_basis_quality_incremental()