2

我如何确保我始终使用 Flink 从 Kafka 主题的开头开始消费?

对于作为 Flink 1.0.2 一部分的Kafka 0.9.x 消费者,似乎不再是 Kafka 而是 Flink 来控制偏移量:

Flink 在内部将偏移量快照作为其分布式检查点的一部分。提交给 Kafka / ZooKeeper 的偏移量只是为了使外部的进度视图与 Flink 的进度视图同步。这样一来,监控和其他工作就可以了解 Flink Kafka 消费者对主题的消费程度。

这是我走了多远,但我的 Flink 程序总是从它停止的地方开始,并且不会按照配置的指示返回到开头:

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myflinkservice")
props.setProperty("auto.offset.reset", "earliest")

val incomingData = env.addSource(
  new FlinkKafkaConsumer09[IncomingDataRecord](
    "my.topic.name",
    new IncomingDataSchema,
    props
  )
)
4

2 回答 2

3

利用:

consumer.setStartFromEarliest();
于 2019-09-24T09:58:37.980 回答
0

我认为您可以通过指定 random 来解决此问题group.id

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", s"myflinkservice_${UUID.randomUUID}")
props.setProperty("auto.offset.reset", "smallest") // "smallest", not "earliest"

auto.offset.reset只有在 ZooKeeper 中没有可用的初始偏移量时才有效。

于 2016-05-10T01:40:59.100 回答