Files
yuanzhipeng 5107fdb74c chore(general): 更新项目配置文件
- 添加必要的配置项
- 优化现有设置
- 确保环境兼容性
2026-01-28 09:35:48 +08:00

111 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import json
def main(**kwargs) -> dict:
"""
函数节点的入口函数
Args:
**kwargs: 从前端配置的参数传入,可通过变量引用获取工作流上下文
Returns:
dict: 返回结果将作为节点输出,可被后续节点引用
"""
# 从 kwargs 获取参数,如果没有提供则使用默认值
url = kwargs.get('url', 'http://192.167.30.2:8088/datasource/sqlExecutionLog/testBatchSqlWithSchema')
# 请求头从kwargs获取或使用默认值
authorization_token = kwargs.get('authorization_token', 'Bearer eyJhbGciOiJIUzUxMiJ9.eyJ0b2tlbl90eXBlIjoiTE9HSU4iLCJsb2dpbl91c2VyX2tleSI6IjAxNzZiNjEyLTc2YWItNDdhMS1iYTRiLTdjNWU2ZTMxNDlmZCJ9.w7aOfDJDHtA4bwNKIvUVK2cf1yO_2F27d_eYuos-p1-XGrSQOX0D4ny0b8Js36MhXwBnF4GDcy8V1VobEN6zBA')
headers = {
'Authorization': authorization_token
}
# 请求体参数从kwargs获取或使用默认值
payload_id = kwargs.get('id', '2006300000000000001')
business_name = kwargs.get('businessName', 'OrderDelayWarningAnalysis')
business_description = kwargs.get('businessDescription', '订单延迟预警分析:依据历史订单的生产周期、物流延误、设备故障等特征,输出延迟概率与红/黄/绿预警等级')
datasource_id = kwargs.get('datasourceId', '57')
sql_template = kwargs.get('sqlTemplate', 'WITH production_cycle_stats AS (SELECT COALESCE(AVG(GREATEST(0, EXTRACT(DAY FROM last_updated_utc - event_time_utc))), 0) AS avg_production_days FROM fact_work_order WHERE status = \'CLOSED\' AND last_updated_utc >= event_time_utc), logistics_delay_stats AS (SELECT customer_id, AVG(GREATEST(0, EXTRACT(DAY FROM event_time_utc - doc_date_utc))) AS avg_logistics_delay_days, SUM(CASE WHEN EXTRACT(DAY FROM event_time_utc - doc_date_utc) > 3 THEN 1 ELSE 0 END) AS delay_count FROM fact_sales_shipment WHERE doc_date_utc IS NOT NULL AND event_time_utc IS NOT NULL GROUP BY customer_id), quality_issue_stats AS (SELECT COALESCE(ROUND(SUM(fail_qty) * 100.0 / NULLIF(SUM(pass_qty + fail_qty), 0), 2), 0) AS defect_rate_pct FROM fact_quality_inspection WHERE pass_qty IS NOT NULL AND fail_qty IS NOT NULL), scrap_stats AS (SELECT COALESCE((SELECT COUNT(*) FROM fact_scrap) * 100.0 / NULLIF((SELECT COUNT(*) FROM fact_work_order WHERE status = \'CLOSED\'), 0), 0) AS scrap_rate_pct), active_work_order_risk AS (SELECT COUNT(*) AS active_wo_count, COALESCE(SUM(CASE WHEN planned_qty > 0 AND (completed_qty / planned_qty) 7 THEN 1 ELSE 0 END), 0) AS lagging_wo_count FROM fact_work_order WHERE status IN (\'OPEN\', \'STARTED\')), global_metrics AS (SELECT pcs.avg_production_days, qis.defect_rate_pct, ss.scrap_rate_pct, awr.active_wo_count, awr.lagging_wo_count FROM production_cycle_stats pcs, quality_issue_stats qis, scrap_stats ss, active_work_order_risk awr) SELECT so.sales_order_number AS order_number, c.customer_name, so.order_date_utc::DATE AS order_date, so.deal_amount AS order_amount, so.payment_status, ROUND(gm.avg_production_days::NUMERIC, 1) AS avg_production_days, ROUND(COALESCE(lds.avg_logistics_delay_days, 0)::NUMERIC, 1) AS avg_logistics_delay_days, COALESCE(lds.delay_count, 0)::INT AS historical_delay_count, ROUND(gm.defect_rate_pct::NUMERIC, 2) AS defect_rate_pct, ROUND(gm.scrap_rate_pct::NUMERIC, 2) AS scrap_rate_pct, gm.active_wo_count::INT AS active_work_order_count, gm.lagging_wo_count::INT AS lagging_work_order_count, ROUND(LEAST(100, GREATEST(0, LEAST(25, GREATEST(0, gm.avg_production_days - 10) * 2.5) + LEAST(30, COALESCE(lds.avg_logistics_delay_days, 0) * 6) + LEAST(25, gm.defect_rate_pct * 2.5) + LEAST(20, gm.lagging_wo_count * 10)))::NUMERIC, 1) AS delay_probability_pct, CASE WHEN (LEAST(25, GREATEST(0, gm.avg_production_days - 10) * 2.5) + LEAST(30, COALESCE(lds.avg_logistics_delay_days, 0) * 6) + LEAST(25, gm.defect_rate_pct * 2.5) + LEAST(20, gm.lagging_wo_count * 10)) >= 60 THEN \'RED\' WHEN (LEAST(25, GREATEST(0, gm.avg_production_days - 10) * 2.5) + LEAST(30, COALESCE(lds.avg_logistics_delay_days, 0) * 6) + LEAST(25, gm.defect_rate_pct * 2.5) + LEAST(20, gm.lagging_wo_count * 10)) >= 30 THEN \'YELLOW\' ELSE \'GREEN\' END AS warning_level, CASE WHEN gm.lagging_wo_count >= 2 THEN \'PRODUCTION_SEVERELY_DELAYED\' WHEN COALESCE(lds.avg_logistics_delay_days, 0) > 5 THEN \'HIGH_LOGISTICS_DELAY_RISK\' WHEN gm.avg_production_days > 15 THEN \'LONG_PRODUCTION_CYCLE\' WHEN gm.defect_rate_pct > 10 THEN \'QUALITY_ISSUES\' ELSE \'NORMAL\' END AS primary_risk_factor FROM fact_sales_order so LEFT JOIN dim_customer c ON so.customer_id = c.customer_id AND c.is_current = \'t\' CROSS JOIN global_metrics gm LEFT JOIN logistics_delay_stats lds ON so.customer_id = lds.customer_id WHERE EXTRACT(YEAR FROM so.order_date_utc) = 2025 ORDER BY delay_probability_pct DESC, so.order_date_utc DESC LIMIT 30')
parameters = kwargs.get('parameters', {})
# 请求体从kwargs获取或使用默认值
payload = {
"id": payload_id,
"businessName": business_name,
"businessDescription": business_description,
"datasourceId": datasource_id,
"sqlTemplate": sql_template,
"parameters": parameters
}
# 超时时间从kwargs获取或使用默认值
timeout = kwargs.get('timeout', 30)
try:
# 发送POST请求
response = requests.post(
url=url,
headers=headers,
json=payload,
timeout=timeout
)
# 检查HTTP响应状态码
response.raise_for_status()
# 解析响应结果
result = response.json()
# 返回接口调用结果
return {
"status": "success",
"status_code": response.status_code,
"result": result
}
except requests.exceptions.Timeout:
return {
"status": "error",
"error_type": "timeout",
"message": f"接口调用超时({timeout}秒)",
"result": None
}
except requests.exceptions.ConnectionError:
return {
"status": "error",
"error_type": "connection_error",
"message": "无法连接到目标服务器",
"result": None
}
except requests.exceptions.HTTPError as e:
return {
"status": "error",
"error_type": "http_error",
"status_code": response.status_code if 'response' in locals() else None,
"message": f"HTTP请求错误: {str(e)}",
"response_text": response.text if 'response' in locals() else "",
"result": None
}
except json.JSONDecodeError:
return {
"status": "error",
"error_type": "json_decode_error",
"message": "接口返回的不是有效的JSON格式",
"response_text": response.text if 'response' in locals() else "",
"result": None
}
except Exception as e:
return {
"status": "error",
"error_type": "unknown_error",
"message": f"接口调用失败: {str(e)}",
"result": None
}
if __name__ == "__main__":
result = main()
print(json.dumps(result, ensure_ascii=False, indent=2))