我想从 Kafka 接收到 Flink 程序的最新数据,但是 Flink 正在读取历史数据。我已经设置auto.offset.reset
为latest
如下所示,但它没有工作
properties.setProperty("auto.offset.reset", "latest");
Flink Programm 正在使用以下代码从 Kafka 接收数据
//getting stream from Kafka and giving it assignTimestampsAndWatermarks
DataStream<JoinedStreamEvent> raw_stream = envrionment.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new JoinSchema(), properties)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
我正在关注 https://issues.apache.org/jira/browse/FLINK-4280上的讨论,这建议以下面提到的方式添加源
Properties props = new Properties();
...
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing is enabled, otherwise, periodically.
kafka.setForwardMetrics(boolean);
...
env.addSource(kafka)
我做了同样的事情,但是我无法访问setStartFromLatest()
FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09<JoinedStreamEvent>( "test", new JoinSchema(),properties);
我应该怎么做才能接收发送到 Kafka 的最新值而不是从历史中接收值?