1

我想用 Spark 做一些自定义 groupBy 聚合,这需要按顺序处理记录(时间戳),并且第 n 条记录的处理需要前(n-1)条记录的处理输出(听起来有点像流式传输任务?)。输入位于按日期分区的一大组文件中。

我目前的解决方案是实现一个 custom org.apache.spark.sql.expressions.Aggregator,它将所有输入记录增量插入缓冲区并在最后进行所有聚合。伪代码如下:

class MyAgg extends Aggregator[IN, SortedList[IN], OUT] {
    override def zero: SortedList[IN] = SortedList.empty

    override def reduce(b: SortedList[IN], e: Event): SortedList[IN] =
        insert_into_b(e)

    override def merge(b1: SortedList[IN], b2: SortedList[IN]): SortedList[IN] =
        merge_two_lists(b1, b2)

    override def finish(b: SortedList[IN]): OUT =
        my_main_aggregation_happens_here:
            b.foldLeft ...
}

val result = myInputDS.groupBy(_.key).agg((new MyAgg()).toColumn) 

这个解决方案有效,但我对性能有很大的担忧,因为 reduce 阶段根本不会减少任何东西,并且所有记录都需要存储在内存中直到最后。我希望有更好的解决方案。

能否请你帮忙?谢谢。

4

0 回答 0