0

下面的脚本 (Spark 1.6) 中止 java.lang.NullPointerException,主要是由于函数 LAG。请指教。

from pyspark.sql import HiveContext
sqlc= HiveContext(sc) 

rdd = sc.parallelize([(1, 65), (2, 66), (3, 65), (4, 68), (5, 71)])

df = sqlc.createDataFrame(rdd, ["account_nbr", "date_time"])
df.registerTempTable("test1")

df2 = sqlc.sql("select a.*, case when lag(a.date_time) is NULL then 0 else  lag(a.date_time) end  as prev_date_time from test1 a")
df2.toPandas()

另一种方法是在 pyspark.sql.functions 下使用函数 when 和 isnull ,如果 isnull 则将延迟降至 0。

df = df.withColumn("prv_date_time", F.lag(df.date_time).over(my_window))
df = df.withColumn("prv_account_nbr", F.lag(df.account_nbr).over(my_window))
df = df.withColumn("diff_sec", F.when(F.isnull(df.date_time -     df.prv_date_time), 0)
                          .otherwise(df.date_time - df.prv_date_time))
4

1 回答 1

0

Lag 是一个分析函数,它使您可以同时访问多于 1 行,因此您需要对某个值进行排序。例如,这有效:

rdd = sc.parallelize([(1, 65), (2, 66), (3, 65), (4, 68), (5, 71)])

df = sqlc.createDataFrame(rdd, ["account_nbr", "date_time"])
df.registerTempTable("test1")

df2 = sqlc.sql("select a.*, nvl(lag(a.date_time) over (order by a.date_time),0) as lag_date_time from test1 a")

还要注意使用 NVL 来处理空值(在这种情况下,如果您希望将它们替换为 0)

输出:

+-----------+---------+-------------+
|account_nbr|date_time|lag_date_time|
+-----------+---------+-------------+
|          1|       65|            0|
|          3|       65|           65|
|          2|       66|           65|
|          4|       68|           66|
|          5|       71|           68|
+-----------+---------+-------------+
于 2017-06-09T18:58:49.713 回答