我有一组 pyspark 代码,它们从雪花数据库中读取,然后执行多个操作,例如groupBy
,、join
column operations like col(a) - col(b)
但是,随着转换的时间越来越长,执行似乎卡住了,无法show
结果。
这是我收集的日志spark.driver.log.persistToDfs.enabled
... ###### spark initialization log stuffs##### ...
22/02/25 12:06:21 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ;
22/02/25 12:06:21 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:21 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:21 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f789-0000-938105d59f8e; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f789-0000-938105d59f8e
22/02/25 12:06:21 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398799014 queryId: 01a289f6-3200-f789-0000-938105d59f8e
22/02/25 12:06:22 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398799014 queryId: 01a289f6-3200-f789-0000-938105d59f8e
22/02/25 12:06:23 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=2 rowCount=1583212 compressSize=49.01 MB unCompressSize=126.37 MB QueryTime=2.33 seconds QueryID=01a289f6-3200-f789-0000-938105d59f8e
22/02/25 12:06:23 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=791606 compressSize=24.50 MB unCompressSize=63.18 MB
22/02/25 12:06:25 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ;
22/02/25 12:06:25 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_0" FROM ( SELECT ( CONCAT ( "SUBQUERY_1"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_1"."PLANT_ID" ) ) ) AS "SUBQUERY_2_COL_0" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PSO, MATERIAL_ID, PLANT_ID, STANDARD_PRICE, PRICE_UNITS, CURRENCY from DARE_PRE_HM.DARE_MATERIAL_LOCATION_MD where PSO = 'AM' ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( CONCAT ( "SUBQUERY_0"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_0"."PLANT_ID" ) ) IS NOT NULL ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_0"
22/02/25 12:06:25 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_0" FROM ( SELECT ( CONCAT ( "SUBQUERY_1"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_1"."PLANT_ID" ) ) ) AS "SUBQUERY_2_COL_0" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PSO, MATERIAL_ID, PLANT_ID, STANDARD_PRICE, PRICE_UNITS, CURRENCY from DARE_PRE_HM.DARE_MATERIAL_LOCATION_MD where PSO = 'AM' ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( CONCAT ( "SUBQUERY_0"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_0"."PLANT_ID" ) ) IS NOT NULL ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_0"
22/02/25 12:06:25 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f789-0000-938105d59f9a; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f789-0000-938105d59f9a
22/02/25 12:06:25 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398959210 queryId: 01a289f6-3200-f789-0000-938105d59f9a
22/02/25 12:06:26 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398959210 queryId: 01a289f6-3200-f789-0000-938105d59f9a
22/02/25 12:06:26 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=1 rowCount=551813 compressSize=4.51 MB unCompressSize=9.67 MB QueryTime=1.16 seconds QueryID=01a289f6-3200-f789-0000-938105d59f9a
22/02/25 12:06:26 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=551813 compressSize=4.51 MB unCompressSize=9.67 MB
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:27 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:28 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ;
22/02/25 12:06:29 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:29 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:29 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f703-0000-938105d5e63e; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f703-0000-938105d5e63e
22/02/25 12:06:29 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398965230 queryId: 01a289f6-3200-f703-0000-938105d5e63e
22/02/25 12:06:30 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398965230 queryId: 01a289f6-3200-f703-0000-938105d5e63e
22/02/25 12:06:30 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=2 rowCount=1583212 compressSize=49.01 MB unCompressSize=126.37 MB QueryTime=1.25 seconds QueryID=01a289f6-3200-f703-0000-938105d5e63e
22/02/25 12:06:30 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=791606 compressSize=24.50 MB unCompressSize=63.18 MB
22/02/25 12:06:32 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ;
22/02/25 12:06:33 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_0" FROM ( SELECT ( CONCAT ( "SUBQUERY_1"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_1"."PLANT_ID" ) ) ) AS "SUBQUERY_2_COL_0" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PSO, MATERIAL_ID, PLANT_ID, STANDARD_PRICE, PRICE_UNITS, CURRENCY from DARE_PRE_HM.DARE_MATERIAL_LOCATION_MD where PSO = 'AM' ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( CONCAT ( "SUBQUERY_0"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_0"."PLANT_ID" ) ) IS NOT NULL ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_0"
22/02/25 12:06:33 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_0" FROM ( SELECT ( CONCAT ( "SUBQUERY_1"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_1"."PLANT_ID" ) ) ) AS "SUBQUERY_2_COL_0" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PSO, MATERIAL_ID, PLANT_ID, STANDARD_PRICE, PRICE_UNITS, CURRENCY from DARE_PRE_HM.DARE_MATERIAL_LOCATION_MD where PSO = 'AM' ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( CONCAT ( "SUBQUERY_0"."MATERIAL_ID" , CONCAT ( '_' , "SUBQUERY_0"."PLANT_ID" ) ) IS NOT NULL ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_0"
22/02/25 12:06:33 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f81e-0000-938105d5af3a; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f81e-0000-938105d5af3a
22/02/25 12:06:33 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398951130 queryId: 01a289f6-3200-f81e-0000-938105d5af3a
22/02/25 12:06:33 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398951130 queryId: 01a289f6-3200-f81e-0000-938105d5af3a
22/02/25 12:06:33 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=1 rowCount=551813 compressSize=4.51 MB unCompressSize=9.67 MB QueryTime=903 ms QueryID=01a289f6-3200-f81e-0000-938105d5af3a
22/02/25 12:06:33 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=551813 compressSize=4.51 MB unCompressSize=9.67 MB
22/02/25 12:06:35 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ;
22/02/25 12:06:35 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT * FROM ( SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_4" ) AS "SUBQUERY_3_COL_0" , ( "SUBQUERY_2"."SUBQUERY_2_COL_5" ) AS "SUBQUERY_3_COL_1" , ( "SUBQUERY_2"."SUBQUERY_2_COL_2" ) AS "SUBQUERY_3_COL_2" , ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_3" , ( SUM ( "SUBQUERY_2"."SUBQUERY_2_COL_3" ) ) AS "SUBQUERY_3_COL_4" FROM ( SELECT ( "SUBQUERY_1"."PLANT_ID" ) AS "SUBQUERY_2_COL_0" , ( "SUBQUERY_1"."DAY" ) AS "SUBQUERY_2_COL_1" , ( "SUBQUERY_1"."MATERIAL_ID" ) AS "SUBQUERY_2_COL_2" , ( "SUBQUERY_1"."MOVEMENT_QTY" ) AS "SUBQUERY_2_COL_3" , ( LPAD ( "SUBQUERY_1"."PO_NUMBER" , 10 , '0' ) ) AS "SUBQUERY_2_COL_4" , ( LPAD ( CAST ( "SUBQUERY_1"."PO_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_2_COL_5" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PLANT_ID, DAY, MATERIAL_ID, MOVEMENT_QTY, PO_NUMBER, PO_ITEM from DARE_L1.BW_MATERIAL_MOVEMENTS where DAY > add_months(current_date() - 1, -5) AND (PO_NUMBER != '' or PO_NUMBER is not null) AND movement_type in ('101','102') ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( ( ( ( LPAD ( "SUBQUERY_0"."PO_NUMBER" , 10 , '0' ) IS NOT NULL ) AND ( LPAD ( CAST ( "SUBQUERY_0"."PO_ITEM" AS VARCHAR ) , 6 , '0' ) IS NOT NULL ) ) AND ( "SUBQUERY_0"."MATERIAL_ID" IS NOT NULL ) ) AND ( "SUBQUERY_0"."PLANT_ID" IS NOT NULL ) ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_4" , "SUBQUERY_2"."SUBQUERY_2_COL_5" , "SUBQUERY_2"."SUBQUERY_2_COL_2" , "SUBQUERY_2"."SUBQUERY_2_COL_0" , "SUBQUERY_2"."SUBQUERY_2_COL_1" ) AS "SUBQUERY_3" WHERE ( ( "SUBQUERY_3"."SUBQUERY_3_COL_4" != 0 ) AND ( "SUBQUERY_3"."SUBQUERY_3_COL_4" IS NOT NULL ) )
22/02/25 12:06:35 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT * FROM ( SELECT ( "SUBQUERY_2"."SUBQUERY_2_COL_4" ) AS "SUBQUERY_3_COL_0" , ( "SUBQUERY_2"."SUBQUERY_2_COL_5" ) AS "SUBQUERY_3_COL_1" , ( "SUBQUERY_2"."SUBQUERY_2_COL_2" ) AS "SUBQUERY_3_COL_2" , ( "SUBQUERY_2"."SUBQUERY_2_COL_0" ) AS "SUBQUERY_3_COL_3" , ( SUM ( "SUBQUERY_2"."SUBQUERY_2_COL_3" ) ) AS "SUBQUERY_3_COL_4" FROM ( SELECT ( "SUBQUERY_1"."PLANT_ID" ) AS "SUBQUERY_2_COL_0" , ( "SUBQUERY_1"."DAY" ) AS "SUBQUERY_2_COL_1" , ( "SUBQUERY_1"."MATERIAL_ID" ) AS "SUBQUERY_2_COL_2" , ( "SUBQUERY_1"."MOVEMENT_QTY" ) AS "SUBQUERY_2_COL_3" , ( LPAD ( "SUBQUERY_1"."PO_NUMBER" , 10 , '0' ) ) AS "SUBQUERY_2_COL_4" , ( LPAD ( CAST ( "SUBQUERY_1"."PO_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_2_COL_5" FROM ( SELECT * FROM ( SELECT * FROM ( ( select PLANT_ID, DAY, MATERIAL_ID, MOVEMENT_QTY, PO_NUMBER, PO_ITEM from DARE_L1.BW_MATERIAL_MOVEMENTS where DAY > add_months(current_date() - 1, -5) AND (PO_NUMBER != '' or PO_NUMBER is not null) AND movement_type in ('101','102') ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0" WHERE ( ( ( ( LPAD ( "SUBQUERY_0"."PO_NUMBER" , 10 , '0' ) IS NOT NULL ) AND ( LPAD ( CAST ( "SUBQUERY_0"."PO_ITEM" AS VARCHAR ) , 6 , '0' ) IS NOT NULL ) ) AND ( "SUBQUERY_0"."MATERIAL_ID" IS NOT NULL ) ) AND ( "SUBQUERY_0"."PLANT_ID" IS NOT NULL ) ) ) AS "SUBQUERY_1" ) AS "SUBQUERY_2" GROUP BY "SUBQUERY_2"."SUBQUERY_2_COL_4" , "SUBQUERY_2"."SUBQUERY_2_COL_5" , "SUBQUERY_2"."SUBQUERY_2_COL_2" , "SUBQUERY_2"."SUBQUERY_2_COL_0" , "SUBQUERY_2"."SUBQUERY_2_COL_1" ) AS "SUBQUERY_3" WHERE ( ( "SUBQUERY_3"."SUBQUERY_3_COL_4" != 0 ) AND ( "SUBQUERY_3"."SUBQUERY_3_COL_4" IS NOT NULL ) )
22/02/25 12:06:35 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f7ca-0000-938105d5d93a; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f7ca-0000-938105d5d93a
22/02/25 12:06:35 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398965246 queryId: 01a289f6-3200-f7ca-0000-938105d5d93a
22/02/25 12:06:36 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398965246 queryId: 01a289f6-3200-f7ca-0000-938105d5d93a
22/02/25 12:06:36 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=1 rowCount=586515 compressSize=13.60 MB unCompressSize=34.75 MB QueryTime=1.01 seconds QueryID=01a289f6-3200-f7ca-0000-938105d5d93a
22/02/25 12:06:36 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=586515 compressSize=13.60 MB unCompressSize=34.75 MB
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:36 INFO SnowflakeTelemetry$: Pushdown fails because of operation: first @ AggregationStatement
message: pushdown failed for aggregate expression
isKnown: false
22/02/25 12:06:37 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ;
22/02/25 12:06:37 INFO SnowflakeRelation: Now executing below command to read from snowflake:
SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:37 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: SELECT ( "SUBQUERY_0"."DOC_NUM" ) AS "SUBQUERY_1_COL_0" , ( LPAD ( CAST ( "SUBQUERY_0"."DOC_ITEM" AS VARCHAR ) , 6 , '0' ) ) AS "SUBQUERY_1_COL_1" , ( LPAD ( CAST ( "SUBQUERY_0"."SCHED_LINE" AS VARCHAR ) , 4 , '0' ) ) AS "SUBQUERY_1_COL_2" , ( "SUBQUERY_0"."MATERIAL" ) AS "SUBQUERY_1_COL_3" , ( "SUBQUERY_0"."PLANT" ) AS "SUBQUERY_1_COL_4" , ( "SUBQUERY_0"."SCL_DELDAT" ) AS "SUBQUERY_1_COL_5" , ( "SUBQUERY_0"."VENDOR" ) AS "SUBQUERY_1_COL_6" , ( "SUBQUERY_0"."SUPP_PLANT" ) AS "SUBQUERY_1_COL_7" , ( "SUBQUERY_0"."CPQUAOU" ) AS "SUBQUERY_1_COL_8" FROM ( SELECT * FROM ( ( select PO_NUMBER as doc_num, PO_ITEM as doc_item, PO_SCHED_LINE as sched_line, MATERIAL_ID as material, PLANT_ID as plant, REQUESTED_DELIVERY_DATE as scl_deldat, VENDOR_ID as vendor, SUPPLYING_PLANT_ID as supp_plant, REQUESTED_QTY as cpquaou from DARE_L1.BW_MM_PURCHASING where scl_deldat > add_months(current_date() - 1, -5) and sys_date = (select max(sys_date) from DARE_L1.BW_MM_PURCHASING) and (NOT del_flag = 'R' or del_flag is NULL) ) ) AS "SF_CONNECTOR_QUERY_ALIAS" ) AS "SUBQUERY_0"
22/02/25 12:06:38 INFO SnowflakeRelation: The query ID for async reading from snowflake is: 01a289f6-3200-f7c4-0000-938105d5cb16; The query ID URL is:
example.snowflake.com/detail?queryId=01a289f6-3200-f7c4-0000-938105d5cb16
22/02/25 12:06:38 INFO SparkConnectorContext$: Add running query for local-1645761951635 session: 162182398799026 queryId: 01a289f6-3200-f7c4-0000-938105d5cb16
22/02/25 12:06:38 INFO SparkConnectorContext$: Remove running query for local-1645761951635 session: 162182398799026 queryId: 01a289f6-3200-f7c4-0000-938105d5cb16
22/02/25 12:06:38 INFO SnowflakeRelation: Spark Connector Master: Total statistics: partitionCount=2 rowCount=1583212 compressSize=49.01 MB unCompressSize=126.37 MB QueryTime=978 ms QueryID=01a289f6-3200-f7c4-0000-938105d5cb16
22/02/25 12:06:38 INFO SnowflakeRelation: Spark Connector Master: Average statistics per partition: rowCount=791606 compressSize=24.50 MB unCompressSize=63.18 MB
22/02/25 12:06:40 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Singapore' , timestamp_ntz_output_format = 'YYYY-MM-DD HH24:MI:SS.FF3', timestamp_ltz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3', timestamp_tz_output_format = 'TZHTZM YYYY-MM-DD HH24:MI:SS.FF3' ;
22/02/25 12:06:41 INFO SnowflakeRelation: Now executing below command to read from snowflake:
...
...
...
...
只要我保持 pyspark 脚本运行,这个日志就会一直持续下去,重复同样的事情。
关于为什么会发生这种情况的任何想法?
注意:当我减少所需的转换量时,它能够得到show()
结果。但是,当我向代码中添加更多数据转换内容时,就会发生此错误。
*由于stackoverflow的字数限制,我无法在本节中发布示例代码。将检查我如何共享一些代码