2

我有一个来自 Kafka 的流式传输作业(使用createDstream)。它的“id”流

[id1,id2,id3 ..]

我有一个实用程序或 api,它接受一个 id 数组并进行一些外部调用并接收一些信息,每个 id 都说“t”

[id:t1,id2:t2,id3:t3...]

我想DStream在调用实用程序时保留 Dstream。我不能在 Dstream rdd 上使用地图转换,因为它会调用每个 id,而且该实用程序正在接受 id 的集合。

Dstream.map(x=> myutility(x)) -- ruled out

如果我使用

Dstream.foreachrdd(rdd=> myutility(rdd.collect.toarray))

我失去了DStream. 我需要保留DStream用于下游处理。

4

1 回答 1

4

实现外部批量调用的方法是直接在分区级别对 DStream 中的 RDD 进行转换。

该模式如下所示:

val transformedStream = dstream.transform{rdd => 
    rdd.mapPartitions{iterator => 
      val externalService = Service.instance() // point to reserve local resources or make server connections.
      val data = iterator.toList // to act in bulk. Need to tune partitioning to avoid huge data loads at this level
      val resultCollection = externalService(data)
      resultCollection.iterator
    }
 }

这种方法使用集群中可用的资源并行处理底层 RDD 的每个分区。请注意,需要为每个分区(而不是每个元素)实例化与外部系统的连接。

于 2017-01-06T15:07:52.073 回答