我正在编写一个数据摄取程序。从 Kafka 读取到 DStream 将 Dstrem 拆分为 3 个流并在每个流上执行操作:
val stream = createSparkStream(Globals.configs, ssc)
val s1 = stream.filter(<predicat1>)
val s2 = stream.filter(<predicat2>)
val s3 = stream.filter(<predicat3>)
//I'm looking for something like:
s1.forEachRddAsync(...
s2.forEachRddAsync(...
s3.forEachRddAsync(...
如果可以在整个 DStream 而不是 RDD 上触发异步提交。