In a function, is there a way to return two DStreams after using filter ?
For example when I filter a DStream, the filtered ones will be stored in a DStream and the unfiltered ones will be stored in another DStream.
1 回答
4
如果它是内置的,这可以更有效地完成,但是
def partition[A](stream: DStream[A])(pred: A => Boolean) {
val stream1 = stream.map(x => (x, pred(x)).cache()
val good = stream1.filter(_._2).map(_._1)
val bad = stream1.filter(!_._2).map(_._1)
(good, bad)
}
需要注意cache()确保stream1只计算一次;如果pred足够简单,并且stream已经被缓存,(stream.filter(pred), stream.filter(x => !pred(x)))应该会更快。
于 2016-04-21T13:29:15.943 回答