0

我正在编写一个数据摄取程序。从 Kafka 读取到 DStream 将 Dstrem 拆分为 3 个流并在每个流上执行操作:

val stream = createSparkStream(Globals.configs, ssc)
val s1 = stream.filter(<predicat1>)
val s2 = stream.filter(<predicat2>)
val s3 = stream.filter(<predicat3>)

//I'm looking for something like:
s1.forEachRddAsync(...
s2.forEachRddAsync(...
s3.forEachRddAsync(... 

如果可以在整个 DStream 而不是 RDD 上触发异步提交。

4

1 回答 1

0

DStream动作方法虽然确实是阻塞的,但不处理数据。这些仅注册DStream为输出流。

一旦StreamingContext启动,处理将根据可用资源进行调度,如果这些允许,则在不相互限制的情况下进行处理。

于 2018-10-14T09:33:16.030 回答