0

下面是我的数据集。

user,device,time_spent,video_start
userA,mob,5,1
userA,desk,5,2
userA,desk,5,3
userA,mob,5,2
userA,mob,5,2
userB,desk,5,2
userB,mob,5,2
userB,mob,5,2
userB,desk,5,2

我想找出每个用户的以下聚合。

   user     total_time_spent        device_distribution
   userA           20                {mob:60%,desk:40%}
   userB           20                {mob:50%,desk:50%}

有人可以帮助我最好在 Java 中使用 spark 2.0 API 来实现这一点。我曾尝试使用 UserDefinedAggregateFunction,但它不支持组内组,因为我必须按设备对每个用户组进行分组,以查找在每个设备上花费的汇总时间。

4

2 回答 2

1

这里的pivot功能非常有用。来自 Databricks 的一篇关于该主题的文章。对于代码(对不起,它是 Scala,但将其转换为 Java 应该不是什么大问题):

import org.apache.spark.sql.functions.udf

case class DeviceDistribution(mob: String, desk: String)

val makeDistribution = udf((mob: Long, desk: Long) => {
  val mobPct = 100.0 * mob / (mob + desk)
  val deskPct = 100.0 * desk / (mob + desk)

  DeviceDistribution(s"$mobPct%", s"$deskPct%")
})

// load your dataset

data
  .groupBy("user", "device")
  .agg(sum("time_spent").as("total_time_spent_by_device"))
  .groupBy("user")
  .pivot("device", Seq("mob", "desk"))
  .agg(first(col("total_time_spent_by_device")))
  .withColumn("total_time_spent", col("mob") + col("desk"))
  .withColumn("device_distribution", makeDistribution(col("mob"), col("desk")))
  .select("user", "total_time_spent", "device_distribution")
  .show

// Result
+-----+----------------+-------------------+
| user|total_time_spent|device_distribution|
+-----+----------------+-------------------+
|userA|              25|      [60.0%,40.0%]|
|userB|              20|      [50.0%,50.0%]|
+-----+----------------+-------------------+

注意:使用该pivot功能,您需要一个聚合功能。由于设备只有一个值,因此您可以简单地使用first.

device_distribution格式不完全是您正在寻找的,但是:

  • 在枢轴线之后,您可以使用您的值做任何您想做的事情(包括您想要的格式)
  • 例如,case class当以 json 格式保存输出数据时,这将具有您想要的格式。
于 2016-12-30T09:47:13.267 回答
1

弗洛伦特·莫尼,

谢谢回答我的问题。

但是,如果我想将其投入生产,我发现此解决方案存在一些问题。

例如,我需要提前知道我的 TB 数据源中可能有多少种设备。在这种情况下,事件枢轴也很难理解。

我已经在 J​​ava 中提供了这个问题的完整解决方案。在这里你可以看到它。

为此,我使用了 UserDefinedAggregateFunction,UDF 专门用于聚合情况。

基本上首先我已经对用户和设备进行了分组,然后调用这个自定义 UDF 来同时查找设备分布,在用户级别进行其他聚合。

https://github.com/himanshu-parmar-bigdata/spark-java-udf-demo

谢谢, Himanshu

于 2016-12-31T14:22:38.303 回答