3

我在 pyspark 中有一个数据框,它有数亿行(这里是它的一个虚拟样本):

import datetime
import pyspark.sql.functions as F
from pyspark.sql import Window,Row
from pyspark.sql.functions import col
from pyspark.sql.functions import month, mean,sum,year,avg
from pyspark.sql.functions import concat_ws,to_date,unix_timestamp,datediff,lit
from pyspark.sql.functions import when,min,max,desc,row_number,col

dg = sqlContext.createDataFrame(sc.parallelize([
Row(cycle_dt=datetime.datetime(1984, 5, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=0,cust_xref_id=10),
Row(cycle_dt=datetime.datetime(1984, 6, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=11),
Row(cycle_dt=datetime.datetime(1984, 7, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=12),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=13),
Row(cycle_dt=datetime.datetime(1983,11, 5, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=8,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1983,12, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=1,net_spending_amt=15,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=7,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=1,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 5, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=1,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984,10, 6, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=1,net_spending_amt=10,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=0,net_spending_amt=8,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=0,net_spending_amt=3,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=1,net_spending_amt=5,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=8,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=0,net_spending_amt=2,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=1,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=9,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=3,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=5,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=1,net_spending_amt=6,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 5, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=9,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 6, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=1,net_spending_amt=1,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=0,net_spending_amt=7,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=0,net_spending_amt=8,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=1,net_spending_amt=3,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=6,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=0,net_spending_amt=9,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=4,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=6,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 5, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 6, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=0,net_spending_amt=3,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=0,net_spending_amt=2,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=1,net_spending_amt=8,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=1,net_spending_amt=4,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=0,net_spending_amt=1,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=1,net_spending_amt=9,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=0,net_spending_amt=3,cust_xref_id=9 )
]))

我试图spend_active_ind为每个求和,cust_xref_id并保持总和大于零的那些。一种方法是使用 grouby 并加入:

dg1 = dg.groupby("cust_xref_id").agg(sum("spend_active_ind").alias("sum_spend_active_ind"))
dg1 = dg1.filter(dg1.sum_spend_active_ind != 0).select("cust_xref_id")
dg = dg.alias("t1").join(dg1.alias("t2"),col("t1.cust_xref_id")==col("t2.cust_xref_id")).select(col("t1.*"))

我能想到的另一种方法是使用窗口:

w = Window.partitionBy ('cust_xref_id')
dg = dg.withColumn('sum_spend_active_ind',sum(dg.spend_active_ind).over(w))
dg = dg.filter(dg.sum_spend_active_ind!=0)

这些方法中的哪一种(或任何其他方法)对于我正在尝试做的事情更有效。谢谢

4

1 回答 1

-1

您可以尝试在localhost:4040explain打开您的 spark-ui,或使用以下方法查看查询计划:

(
    dg
    .groupby('cust_xref_id')
    .agg(F.sum('spend_active_ind').alias('sum_spend_active_ind'))
    .filter(F.col('sum_spend_active_ind') > 0)
).explain()
于 2019-11-06T22:10:28.730 回答