import requests import json def main(**kwargs) -> dict: """ 函数节点的入口函数 Args: **kwargs: 从前端配置的参数传入,可通过变量引用获取工作流上下文 Returns: dict: 返回结果将作为节点输出,可被后续节点引用 """ # 从 kwargs 获取参数,如果没有提供则使用默认值 url = kwargs.get('url', 'http://192.167.30.2:8088/datasource/sqlExecutionLog/testBatchSqlWithSchema') # 请求头(从kwargs获取或使用默认值) authorization_token = kwargs.get('authorization_token', 'Bearer eyJhbGciOiJIUzUxMiJ9.eyJ0b2tlbl90eXBlIjoiTE9HSU4iLCJsb2dpbl91c2VyX2tleSI6IjAxNzZiNjEyLTc2YWItNDdhMS1iYTRiLTdjNWU2ZTMxNDlmZCJ9.w7aOfDJDHtA4bwNKIvUVK2cf1yO_2F27d_eYuos-p1-XGrSQOX0D4ny0b8Js36MhXwBnF4GDcy8V1VobEN6zBA') headers = { 'Authorization': authorization_token } # 请求体参数(从kwargs获取或使用默认值) payload_id = kwargs.get('id', '2006300000000000001') business_name = kwargs.get('businessName', 'OrderDelayWarningAnalysis') business_description = kwargs.get('businessDescription', '订单延迟预警分析:依据历史订单的生产周期、物流延误、设备故障等特征,输出延迟概率与红/黄/绿预警等级') datasource_id = kwargs.get('datasourceId', '57') sql_template = kwargs.get('sqlTemplate', 'WITH production_cycle_stats AS (SELECT COALESCE(AVG(GREATEST(0, EXTRACT(DAY FROM last_updated_utc - event_time_utc))), 0) AS avg_production_days FROM fact_work_order WHERE status = \'CLOSED\' AND last_updated_utc >= event_time_utc), logistics_delay_stats AS (SELECT customer_id, AVG(GREATEST(0, EXTRACT(DAY FROM event_time_utc - doc_date_utc))) AS avg_logistics_delay_days, SUM(CASE WHEN EXTRACT(DAY FROM event_time_utc - doc_date_utc) > 3 THEN 1 ELSE 0 END) AS delay_count FROM fact_sales_shipment WHERE doc_date_utc IS NOT NULL AND event_time_utc IS NOT NULL GROUP BY customer_id), quality_issue_stats AS (SELECT COALESCE(ROUND(SUM(fail_qty) * 100.0 / NULLIF(SUM(pass_qty + fail_qty), 0), 2), 0) AS defect_rate_pct FROM fact_quality_inspection WHERE pass_qty IS NOT NULL AND fail_qty IS NOT NULL), scrap_stats AS (SELECT COALESCE((SELECT COUNT(*) FROM fact_scrap) * 100.0 / NULLIF((SELECT COUNT(*) FROM fact_work_order WHERE status = \'CLOSED\'), 0), 0) AS scrap_rate_pct), active_work_order_risk AS (SELECT COUNT(*) AS active_wo_count, COALESCE(SUM(CASE WHEN planned_qty > 0 AND (completed_qty / planned_qty) 7 THEN 1 ELSE 0 END), 0) AS lagging_wo_count FROM fact_work_order WHERE status IN (\'OPEN\', \'STARTED\')), global_metrics AS (SELECT pcs.avg_production_days, qis.defect_rate_pct, ss.scrap_rate_pct, awr.active_wo_count, awr.lagging_wo_count FROM production_cycle_stats pcs, quality_issue_stats qis, scrap_stats ss, active_work_order_risk awr) SELECT so.sales_order_number AS order_number, c.customer_name, so.order_date_utc::DATE AS order_date, so.deal_amount AS order_amount, so.payment_status, ROUND(gm.avg_production_days::NUMERIC, 1) AS avg_production_days, ROUND(COALESCE(lds.avg_logistics_delay_days, 0)::NUMERIC, 1) AS avg_logistics_delay_days, COALESCE(lds.delay_count, 0)::INT AS historical_delay_count, ROUND(gm.defect_rate_pct::NUMERIC, 2) AS defect_rate_pct, ROUND(gm.scrap_rate_pct::NUMERIC, 2) AS scrap_rate_pct, gm.active_wo_count::INT AS active_work_order_count, gm.lagging_wo_count::INT AS lagging_work_order_count, ROUND(LEAST(100, GREATEST(0, LEAST(25, GREATEST(0, gm.avg_production_days - 10) * 2.5) + LEAST(30, COALESCE(lds.avg_logistics_delay_days, 0) * 6) + LEAST(25, gm.defect_rate_pct * 2.5) + LEAST(20, gm.lagging_wo_count * 10)))::NUMERIC, 1) AS delay_probability_pct, CASE WHEN (LEAST(25, GREATEST(0, gm.avg_production_days - 10) * 2.5) + LEAST(30, COALESCE(lds.avg_logistics_delay_days, 0) * 6) + LEAST(25, gm.defect_rate_pct * 2.5) + LEAST(20, gm.lagging_wo_count * 10)) >= 60 THEN \'RED\' WHEN (LEAST(25, GREATEST(0, gm.avg_production_days - 10) * 2.5) + LEAST(30, COALESCE(lds.avg_logistics_delay_days, 0) * 6) + LEAST(25, gm.defect_rate_pct * 2.5) + LEAST(20, gm.lagging_wo_count * 10)) >= 30 THEN \'YELLOW\' ELSE \'GREEN\' END AS warning_level, CASE WHEN gm.lagging_wo_count >= 2 THEN \'PRODUCTION_SEVERELY_DELAYED\' WHEN COALESCE(lds.avg_logistics_delay_days, 0) > 5 THEN \'HIGH_LOGISTICS_DELAY_RISK\' WHEN gm.avg_production_days > 15 THEN \'LONG_PRODUCTION_CYCLE\' WHEN gm.defect_rate_pct > 10 THEN \'QUALITY_ISSUES\' ELSE \'NORMAL\' END AS primary_risk_factor FROM fact_sales_order so LEFT JOIN dim_customer c ON so.customer_id = c.customer_id AND c.is_current = \'t\' CROSS JOIN global_metrics gm LEFT JOIN logistics_delay_stats lds ON so.customer_id = lds.customer_id WHERE EXTRACT(YEAR FROM so.order_date_utc) = 2025 ORDER BY delay_probability_pct DESC, so.order_date_utc DESC LIMIT 30') parameters = kwargs.get('parameters', {}) # 请求体(从kwargs获取或使用默认值) payload = { "id": payload_id, "businessName": business_name, "businessDescription": business_description, "datasourceId": datasource_id, "sqlTemplate": sql_template, "parameters": parameters } # 超时时间(从kwargs获取或使用默认值) timeout = kwargs.get('timeout', 30) try: # 发送POST请求 response = requests.post( url=url, headers=headers, json=payload, timeout=timeout ) # 检查HTTP响应状态码 response.raise_for_status() # 解析响应结果 result = response.json() # 返回接口调用结果 return { "status": "success", "status_code": response.status_code, "result": result } except requests.exceptions.Timeout: return { "status": "error", "error_type": "timeout", "message": f"接口调用超时({timeout}秒)", "result": None } except requests.exceptions.ConnectionError: return { "status": "error", "error_type": "connection_error", "message": "无法连接到目标服务器", "result": None } except requests.exceptions.HTTPError as e: return { "status": "error", "error_type": "http_error", "status_code": response.status_code if 'response' in locals() else None, "message": f"HTTP请求错误: {str(e)}", "response_text": response.text if 'response' in locals() else "", "result": None } except json.JSONDecodeError: return { "status": "error", "error_type": "json_decode_error", "message": "接口返回的不是有效的JSON格式", "response_text": response.text if 'response' in locals() else "", "result": None } except Exception as e: return { "status": "error", "error_type": "unknown_error", "message": f"接口调用失败: {str(e)}", "result": None } if __name__ == "__main__": result = main() print(json.dumps(result, ensure_ascii=False, indent=2))