3

我正在尝试将部分函数传递给通过滑动窗口在 DStream 批处理中捕获的所有 RDD 的联合。假设我在离散为 1 秒批次的流上构建了一个超过 10 秒的窗口操作:

val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))

window将有 K 个 RDD。我想collect(f: PartialFunction[T, U])在所有 K 个这些 RDD 的联合上使用。++我可以使用调用联合运算符foreachRDD,但我想返回一个RDDnot aUnit并避免副作用。

我正在寻找的是一个减速器

def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]

DStream我可以像这样使用:

window.reduce(_ ++ _).transform(_.collect(myPartialFunc))

但这在 Spark Streaming API 中不可用。

有没有人有任何好的想法可以将流中捕获的 RDD 组合成单个 RDD,以便我可以传入部分函数?还是为了实现我自己的 RDD 减速器?也许这个功能会在随后的 Spark 版本中出现?

4

1 回答 1

2

DStream 操作不直接支持部分功能,但实现相同功能并不难。

例如,让我们采用一个简单的偏函数,它接受一个字符串,如果它是一个数字,则产生一个字符串的 Int:

val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt}

我们有一个字符串的 dstream:

val stringDStream:DStream[String] = ??? // use your stream source here

然后我们可以像这样将偏函数应用于 DStream:

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)
于 2014-11-08T11:22:05.730 回答