0

我的 Spark 流式传输作业正在使用来自 Kafka 的数据

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
                        prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);

每当我重新开始我的工作时,它就会从最后一个偏移存储开始消耗(我假设这是因为发送处理后的数据需要很多时间,如果我更改消费者组,它会立即使用新消息)

我是 kafka 8.1.1,其中 auto.offset.reset 默认为最大,这意味着每当我重新启动 kafka 时,都会从我离开的地方发送数据。

我的用例要求我忽略这些数据并只处理到达的数据。我怎样才能做到这一点?任何建议

4

1 回答 1

4

有两种方法可以实现这一点:

  1. 每次重启时创建一个唯一的消费者组,它将从最新的偏移量开始消费。

  2. 使用直接方法而不是基于接收器;在这里,您可以更好地控制您的消费方式,但必须手动更新 zookeeper 以存储您的偏移量。在下面的示例中,它将始终从最新的偏移量开始。

    import org.apache.spark.streaming.kafka._
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    

此处有关直接方法的文档:https ://spark.apache.org/docs/latest/streaming-kafka-integration.html

于 2015-05-11T11:51:49.823 回答