In a function, is there a way to return two DStreams after using filter
?
For example when I filter a DStream
, the filtered ones will be stored in a DStream
and the unfiltered ones will be stored in another DStream
.
问问题
192 次
1 回答
4
如果它是内置的,这可以更有效地完成,但是
def partition[A](stream: DStream[A])(pred: A => Boolean) {
val stream1 = stream.map(x => (x, pred(x)).cache()
val good = stream1.filter(_._2).map(_._1)
val bad = stream1.filter(!_._2).map(_._1)
(good, bad)
}
需要注意cache()
确保stream1
只计算一次;如果pred
足够简单,并且stream
已经被缓存,(stream.filter(pred), stream.filter(x => !pred(x)))
应该会更快。
于 2016-04-21T13:29:15.943 回答