4

我在驱动程序中有这个函数,它将 rdds 的结果收集到一个数组中并将其发回。但是,即使 RDD(在 dstream 中)有数据,该函数也会返回一个空数组……我做错了什么?

def runTopFunction() : Array[(String, Int)] = {
        val topSearches = some function....
        val summary = new ArrayBuffer[(String,Int)]()
        topSearches.foreachRDD(rdd => {
            summary = summary.++(rdd.collect())
        })    

    return summary.toArray
}
4

2 回答 2

1

So while the foreachRDD will do what you are looking to do, it is also non-blocking which means it won't wait until all of the stream is processed. Since you cal toArray on your buffer right after the call to foreachRDD, there won't have been any elements processed yet.

于 2015-02-26T00:46:37.133 回答
1

DStream.forEachRDD是对给定的操作DStream,将安排在每个流式批处理间隔上执行。这是稍后要执行的作业的声明式构造。

不支持以这种方式累加值,因为虽然 Dstream.forEachRDD 只是说“在每次迭代中执行此操作”,但周围的累加代码会立即执行,从而导致一个空数组。

根据summary计算后数据发生的情况,关于如何实现这一点的选择很少:

  • 如果需要由另一个进程检索数据,请使用共享线程安全结构。优先级队列非常适合 top-k 使用。
  • topSearches如果数据将被存储(fs,db),您可以在将函数应用于 dstream后写入存储。
于 2015-02-27T10:10:00.957 回答