下面的脚本 (Spark 1.6) 中止 java.lang.NullPointerException,主要是由于函数 LAG。请指教。
from pyspark.sql import HiveContext
sqlc= HiveContext(sc)
rdd = sc.parallelize([(1, 65), (2, 66), (3, 65), (4, 68), (5, 71)])
df = sqlc.createDataFrame(rdd, ["account_nbr", "date_time"])
df.registerTempTable("test1")
df2 = sqlc.sql("select a.*, case when lag(a.date_time) is NULL then 0 else lag(a.date_time) end as prev_date_time from test1 a")
df2.toPandas()
另一种方法是在 pyspark.sql.functions 下使用函数 when 和 isnull ,如果 isnull 则将延迟降至 0。
df = df.withColumn("prv_date_time", F.lag(df.date_time).over(my_window))
df = df.withColumn("prv_account_nbr", F.lag(df.account_nbr).over(my_window))
df = df.withColumn("diff_sec", F.when(F.isnull(df.date_time - df.prv_date_time), 0)
.otherwise(df.date_time - df.prv_date_time))