7

如何从 group by 中获取第一个非空值?我尝试使用first with coalesce F.first(F.coalesce("code"))但我没有得到想要的行为(我似乎得到了第一行)。

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

我试过了:

(df
  .groupby("id")
  .agg(F.first(F.coalesce("code")),
       F.first(F.coalesce("name")))
  .collect())

期望的输出

[Row(id='a', code='code1', name='name2')]
4

2 回答 2

19

对于 Spark 1.3 - 1.5,这可以解决问题:

from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()

+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
|  a|      code1|      name2|
+---+-----------+-----------+

编辑

显然,在 1.6 版中,它们改变了first聚合函数的处理方式。First现在,应该使用第二个参数构造底层类ignoreNullsExpr,该参数尚未被first聚合函数使用(如可以在此处看到的)。但是,在 Spark 2.0 中,它将能够调用agg(F.first(col, True))忽略空值(可以在此处检查)。

因此,不幸的是,对于 Spark 1.6,方法必须有所不同,而且效率会低一些。一个想法如下:

from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()

+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
|  a|        code1|        name2|
+---+-------------+-------------+

也许有更好的选择。如果我找到答案,我会编辑答案。

于 2016-05-20T12:06:14.383 回答
4

因为我对每个分组只有一个非空值,所以在 1.6 中使用 min / max 可以达到我的目的:

(df
  .groupby("id")
  .agg(F.min("code"),
       F.min("name"))
  .show())

+---+---------+---------+
| id|min(code)|min(name)|
+---+---------+---------+
|  a|    code1|    name2|
+---+---------+---------+
于 2016-06-18T16:51:31.027 回答