0

我有一组 pyspark 代码,它们从雪花数据库中读取,然后执行多个操作,例如groupBy,、joincolumn operations like col(a) - col(b)

但是,随着转换的时间越来越长,执行似乎卡住了,无法show结果。

这是我收集的日志spark.driver.log.persistToDfs.enabled

... ###### spark initialization log stuffs##### ...

22/02/25 12:06:21 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ; 
22/02/25 12:06:21 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:21 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" 
22/02/25 12:06:21 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f789-0000-938105d59f8e; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f789-0000-938105d59f8e
22/02/25 12:06:21 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398799014 queryId: 01a289f6-3200-f789-0000-938105d59f8e
22/02/25 12:06:22 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398799014 queryId: 01a289f6-3200-f789-0000-938105d59f8e
22/02/25 12:06:23 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=2 rowCount=1583212 compressSize=49.01 MB unCompressSize=126.37 MB QueryTime=2.33 seconds QueryID=01a289f6-3200-f789-0000-938105d59f8e
22/02/25 12:06:23 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=791606 compressSize=24.50 MB unCompressSize=63.18 MB
22/02/25 12:06:25 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ; 
22/02/25 12:06:25 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_0" FROM ( SELECT ( CONCAT ( "SUBQUERY_1"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_1"."PLANT_ID" ) ) ) AS "SUBQUERY_2_COL_0" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PSO, MATERIAL_ID, PLANT_ID, STANDARD_PRICE, PRICE_UNITS, CURRENCY from DARE_PRE_HM.DARE_MATERIAL_LOCATION_MD where PSO = 'AM' ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( CONCAT ( "SUBQUERY_0"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_0"."PLANT_ID" ) ) IS NOT NULL ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_0"
22/02/25 12:06:25 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_0" FROM ( SELECT ( CONCAT ( "SUBQUERY_1"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_1"."PLANT_ID" ) ) ) AS "SUBQUERY_2_COL_0" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PSO, MATERIAL_ID, PLANT_ID, STANDARD_PRICE, PRICE_UNITS, CURRENCY from DARE_PRE_HM.DARE_MATERIAL_LOCATION_MD where PSO = 'AM' ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( CONCAT ( "SUBQUERY_0"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_0"."PLANT_ID" ) ) IS NOT NULL ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_0" 
22/02/25 12:06:25 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f789-0000-938105d59f9a; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f789-0000-938105d59f9a
22/02/25 12:06:25 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398959210 queryId: 01a289f6-3200-f789-0000-938105d59f9a
22/02/25 12:06:26 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398959210 queryId: 01a289f6-3200-f789-0000-938105d59f9a
22/02/25 12:06:26 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=1 rowCount=551813 compressSize=4.51 MB unCompressSize=9.67 MB QueryTime=1.16 seconds QueryID=01a289f6-3200-f789-0000-938105d59f9a
22/02/25 12:06:26 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=551813 compressSize=4.51 MB unCompressSize=9.67 MB
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:28 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ; 
22/02/25 12:06:29 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:29 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" 
22/02/25 12:06:29 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f703-0000-938105d5e63e; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f703-0000-938105d5e63e
22/02/25 12:06:29 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398965230 queryId: 01a289f6-3200-f703-0000-938105d5e63e
22/02/25 12:06:30 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398965230 queryId: 01a289f6-3200-f703-0000-938105d5e63e
22/02/25 12:06:30 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=2 rowCount=1583212 compressSize=49.01 MB unCompressSize=126.37 MB QueryTime=1.25 seconds QueryID=01a289f6-3200-f703-0000-938105d5e63e
22/02/25 12:06:30 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=791606 compressSize=24.50 MB unCompressSize=63.18 MB
22/02/25 12:06:32 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ; 
22/02/25 12:06:33 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_0" FROM ( SELECT ( CONCAT ( "SUBQUERY_1"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_1"."PLANT_ID" ) ) ) AS "SUBQUERY_2_COL_0" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PSO, MATERIAL_ID, PLANT_ID, STANDARD_PRICE, PRICE_UNITS, CURRENCY from DARE_PRE_HM.DARE_MATERIAL_LOCATION_MD where PSO = 'AM' ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( CONCAT ( "SUBQUERY_0"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_0"."PLANT_ID" ) ) IS NOT NULL ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_0"
22/02/25 12:06:33 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_0" FROM ( SELECT ( CONCAT ( "SUBQUERY_1"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_1"."PLANT_ID" ) ) ) AS "SUBQUERY_2_COL_0" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PSO, MATERIAL_ID, PLANT_ID, STANDARD_PRICE, PRICE_UNITS, CURRENCY from DARE_PRE_HM.DARE_MATERIAL_LOCATION_MD where PSO = 'AM' ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( CONCAT ( "SUBQUERY_0"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_0"."PLANT_ID" ) ) IS NOT NULL ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_0" 
22/02/25 12:06:33 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f81e-0000-938105d5af3a; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f81e-0000-938105d5af3a
22/02/25 12:06:33 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398951130 queryId: 01a289f6-3200-f81e-0000-938105d5af3a
22/02/25 12:06:33 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398951130 queryId: 01a289f6-3200-f81e-0000-938105d5af3a
22/02/25 12:06:33 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=1 rowCount=551813 compressSize=4.51 MB unCompressSize=9.67 MB QueryTime=903 ms QueryID=01a289f6-3200-f81e-0000-938105d5af3a
22/02/25 12:06:33 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=551813 compressSize=4.51 MB unCompressSize=9.67 MB
22/02/25 12:06:35 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ; 
22/02/25 12:06:35 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT * FROM ( SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_4" ) AS "SUBQUERY_3_COL_0" , ( "SUBQUERY_2"."SUBQUERY_2_COL_5" ) AS "SUBQUERY_3_COL_1" , ( "SUBQUERY_2"."SUBQUERY_2_COL_2" ) AS "SUBQUERY_3_COL_2" , ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_3" , ( SUM ( "SUBQUERY_2"."SUBQUERY_2_COL_3" ) ) AS "SUBQUERY_3_COL_4" FROM ( SELECT ( "SUBQUERY_1"."PLANT_ID" ) AS "SUBQUERY_2_COL_0" , ( "SUBQUERY_1"."DAY" ) AS "SUBQUERY_2_COL_1" , ( "SUBQUERY_1"."MATERIAL_ID" ) AS "SUBQUERY_2_COL_2" , ( "SUBQUERY_1"."MOVEMENT_QTY" ) AS "SUBQUERY_2_COL_3" , ( LPAD ( "SUBQUERY_1"."PO_NUMBER" , 10 , '0' ) ) AS "SUBQUERY_2_COL_4" , ( LPAD ( CAST ( "SUBQUERY_1"."PO_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_2_COL_5" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PLANT_ID, DAY, MATERIAL_ID, MOVEMENT_QTY, PO_NUMBER, PO_ITEM from DARE_L1.BW_MATERIAL_MOVEMENTS where DAY > add_months(current_date() - 1, -5) AND (PO_NUMBER != '' or PO_NUMBER is not null) AND movement_type in ('101','102') ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( ( ( ( LPAD ( "SUBQUERY_0"."PO_NUMBER" , 10 , '0' ) IS NOT NULL ) AND ( LPAD ( CAST ( "SUBQUERY_0"."PO_ITEM" AS VARCHAR ) , 6 , '0' ) IS NOT NULL ) ) AND ( "SUBQUERY_0"."MATERIAL_ID" IS NOT NULL ) ) AND ( "SUBQUERY_0"."PLANT_ID" IS NOT NULL ) ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_4" , "SUBQUERY_2"."SUBQUERY_2_COL_5" , "SUBQUERY_2"."SUBQUERY_2_COL_2" , "SUBQUERY_2"."SUBQUERY_2_COL_0" , "SUBQUERY_2"."SUBQUERY_2_COL_1" ) AS "SUBQUERY_3" WHERE ( ( "SUBQUERY_3"."SUBQUERY_3_COL_4" != 0 ) AND ( "SUBQUERY_3"."SUBQUERY_3_COL_4" IS NOT NULL ) )
22/02/25 12:06:35 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT * FROM ( SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_4" ) AS "SUBQUERY_3_COL_0" , ( "SUBQUERY_2"."SUBQUERY_2_COL_5" ) AS "SUBQUERY_3_COL_1" , ( "SUBQUERY_2"."SUBQUERY_2_COL_2" ) AS "SUBQUERY_3_COL_2" , ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_3" , ( SUM ( "SUBQUERY_2"."SUBQUERY_2_COL_3" ) ) AS "SUBQUERY_3_COL_4" FROM ( SELECT ( "SUBQUERY_1"."PLANT_ID" ) AS "SUBQUERY_2_COL_0" , ( "SUBQUERY_1"."DAY" ) AS "SUBQUERY_2_COL_1" , ( "SUBQUERY_1"."MATERIAL_ID" ) AS "SUBQUERY_2_COL_2" , ( "SUBQUERY_1"."MOVEMENT_QTY" ) AS "SUBQUERY_2_COL_3" , ( LPAD ( "SUBQUERY_1"."PO_NUMBER" , 10 , '0' ) ) AS "SUBQUERY_2_COL_4" , ( LPAD ( CAST ( "SUBQUERY_1"."PO_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_2_COL_5" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PLANT_ID, DAY, MATERIAL_ID, MOVEMENT_QTY, PO_NUMBER, PO_ITEM from DARE_L1.BW_MATERIAL_MOVEMENTS where DAY > add_months(current_date() - 1, -5) AND (PO_NUMBER != '' or PO_NUMBER is not null) AND movement_type in ('101','102') ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( ( ( ( LPAD ( "SUBQUERY_0"."PO_NUMBER" , 10 , '0' ) IS NOT NULL ) AND ( LPAD ( CAST ( "SUBQUERY_0"."PO_ITEM" AS VARCHAR ) , 6 , '0' ) IS NOT NULL ) ) AND ( "SUBQUERY_0"."MATERIAL_ID" IS NOT NULL ) ) AND ( "SUBQUERY_0"."PLANT_ID" IS NOT NULL ) ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_4" , "SUBQUERY_2"."SUBQUERY_2_COL_5" , "SUBQUERY_2"."SUBQUERY_2_COL_2" , "SUBQUERY_2"."SUBQUERY_2_COL_0" , "SUBQUERY_2"."SUBQUERY_2_COL_1" ) AS "SUBQUERY_3" WHERE ( ( "SUBQUERY_3"."SUBQUERY_3_COL_4" != 0 ) AND ( "SUBQUERY_3"."SUBQUERY_3_COL_4" IS NOT NULL ) ) 
22/02/25 12:06:35 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f7ca-0000-938105d5d93a; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f7ca-0000-938105d5d93a
22/02/25 12:06:35 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398965246 queryId: 01a289f6-3200-f7ca-0000-938105d5d93a
22/02/25 12:06:36 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398965246 queryId: 01a289f6-3200-f7ca-0000-938105d5d93a
22/02/25 12:06:36 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=1 rowCount=586515 compressSize=13.60 MB unCompressSize=34.75 MB QueryTime=1.01 seconds QueryID=01a289f6-3200-f7ca-0000-938105d5d93a
22/02/25 12:06:36 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=586515 compressSize=13.60 MB unCompressSize=34.75 MB
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
 message: pushdown failed for aggregate expression
 isKnown: false
           
22/02/25 12:06:37 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ; 
22/02/25 12:06:37 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:37 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" 
22/02/25 12:06:38 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f7c4-0000-938105d5cb16; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f7c4-0000-938105d5cb16
22/02/25 12:06:38 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398799026 queryId: 01a289f6-3200-f7c4-0000-938105d5cb16
22/02/25 12:06:38 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398799026 queryId: 01a289f6-3200-f7c4-0000-938105d5cb16
22/02/25 12:06:38 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=2 rowCount=1583212 compressSize=49.01 MB unCompressSize=126.37 MB QueryTime=978 ms QueryID=01a289f6-3200-f7c4-0000-938105d5cb16
22/02/25 12:06:38 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=791606 compressSize=24.50 MB unCompressSize=63.18 MB
22/02/25 12:06:40 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ; 
22/02/25 12:06:41 INFO SnowflakeRelation: Now executing below command to read from snowflake:
...
...
...
...

只要我保持 pyspark 脚本运行,这个日志就会一直持续下去,重复同样的事情。

关于为什么会发生这种情况的任何想法?

注意:当我减少所需的转换量时,它能够得到show()结果。但是,当我向代码中添加更多数据转换内容时,就会发生此错误。

*由于stackoverflow的字数限制,我无法在本节中发布示例代码。将检查我如何共享一些代码

4

1 回答 1

0

Not sure what can cause the issue but it might be related to the message "Pushdown fails because of operation: first @ AggregationStatement"

It means that you are using some function or expression on the dataframe that can not be translated to SQL, you can find the supported functions at https://docs.snowflake.com/en/user-guide/spark-connector-use.html#pushdown

However, it might something completely different that is causing the issue, so additional ways to get some understanding is to have a look at the Snowflake side and see what is actually pushed down ie what SQL has been executed.

Finally, without the code it is very hard to give more precise help.

于 2022-02-28T09:59:04.460 回答