1

我使用 Spark 1.6.2

我需要找到每组的最大计数。

val myData = Seq(("aa1", "GROUP_A", "10"),("aa1","GROUP_A", "12"),("aa2","GROUP_A", "12"),("aa3", "GROUP_B", "14"),("aa3","GROUP_B", "11"),("aa3","GROUP_B","12" ),("aa2", "GROUP_B", "12"))

val df = sc.parallelize(myData).toDF("id","type","activity")

让我们首先计算每组的观察次数:

df.groupBy("type","id").count.show

+-------+---+-----+
|   type| id|count|
+-------+---+-----+
|GROUP_A|aa1|    2|
|GROUP_A|aa2|    1|
|GROUP_B|aa2|    1|
|GROUP_B|aa3|    3|
+-------+---+-----+

这是预期的结果:

+--------+----+-----+
|type    |  id|count|
+----+--------+-----+
| GROUP_A| aa1|    2|
| GROUP_B| aa3|    3|
+--------+----+-----+

我试过这个,但它不起作用:

df.groupBy("type","id").count.filter("count = 'max'").show
4

3 回答 3

2

要获得“具有 X 列最大值的行”(而不仅仅是那个最大值),您可以使用这个将相关列“分组”在一起的小技巧,struct将排序列作为第一列 - 然后计算那个max结构的。由于排序struct由其第一列的排序“支配” - 我们将得到所需的结果:

df.groupBy("id","type").count()                // get count per id and type
  .groupBy("type")                             // now group by type only
  .agg(max(struct("count", "id")) as "struct") // get maximum of (count, id) structs - since count is first, and id is unique - count will decide the ordering
  .select($"type", $"struct.id" as "id", $"struct.count" as "count") // "unwrap" structs
  .show()

// +-------+---+-----+
// |   type| id|count|
// +-------+---+-----+
// |GROUP_A|aa1|    2|
// |GROUP_B|aa3|    3|
// +-------+---+-----+
于 2017-05-11T16:04:27.453 回答
1

您可以在分组后使用 max 函数。

val myData = Seq(("aa1", "GROUP_A", "10"),("aa1","GROUP_A", "12"),("aa2","GROUP_A", "12"),("aa3", "GROUP_B", "14"),("aa3","GROUP_B", "11"),("aa3","GROUP_B","12" ),("aa2", "GROUP_B", "12"))

val df = sc.parallelize(myData).toDF("id","type","activity")

//在 groupby 之后计数,然后在 cnt 字段中找到最大值之后的计数字段的别名。

val newDF = df1.groupBy("type", "id").agg(count("*").alias("cnt"))

val df1 = newDF.groupBy("type").max("cnt").show

现在您可以加入这两个数据框以获取输出。

df1.join(newDF.as("newDF"), col("cnt") === col("max(cnt)")).select($"newDF.*").show
于 2017-05-11T16:00:30.027 回答
1

您可以通过结合上面@Tzach 的答案使用Window函数来查找max和删除duplicates

val windowSpec = Window.partitionBy(col("type"))
import org.apache.spark.sql.functions._
df.groupBy("type","id").count()
  .withColumn("count", max(struct("count", "id")).over(windowSpec))
  .dropDuplicates("type")
  .select($"type", $"count.id" as "id", $"count.count" as "count").show

谢谢

于 2017-05-11T16:09:42.317 回答