3

In a function, is there a way to return two DStreams after using filter ? For example when I filter a DStream, the filtered ones will be stored in a DStream and the unfiltered ones will be stored in another DStream.

4

1 回答 1

4

如果它是内置的,这可以更有效地完成,但是

def partition[A](stream: DStream[A])(pred: A => Boolean) {
  val stream1 = stream.map(x => (x, pred(x)).cache()
  val good = stream1.filter(_._2).map(_._1)
  val bad = stream1.filter(!_._2).map(_._1)
  (good, bad)
}

需要注意cache()确保stream1只计算一次;如果pred足够简单,并且stream已经被缓存,(stream.filter(pred), stream.filter(x => !pred(x)))应该会更快。

于 2016-04-21T13:29:15.943 回答