0

我有一个场景,我想使用 Spark DStreams 重新处理来自 Kafka 的特定批次数据。

假设我想重新处理以下批次的数据。

主题分区1-{1000,2000} 主题分区2-{500-600}

下面是我拥有的代码片段,我可以在其中指定起始偏移量。

val inputDStream = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String, String](
      topic-partition-list,
      kafkaProps,
      starting-offset-ranges))

但是,我想知道他们是否也可以指定结束偏移量,例如结构化流批处理模式。

所以本质上,它应该处理这个小批量并停止工作流程。

注意:我不想在这个用例中使用结构化流。只想使用 DStreams。

4

1 回答 1

0

找到了一种方法来做到这一点。

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
于 2018-12-20T06:31:20.567 回答