我想写一个 Kafka 消费者并在 Bigquery 中写入记录,我想在 Bigquery 中成功插入时手动提交偏移量。我写了一个示例代码,但它不起作用,有人可以帮忙吗
ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))
Flux<ReceiverRecord<Integer, String>> kafkaFlux1 = KafkaReceiver.create(options).receive()
.doOnNext(r -> {
try {
writeBigquery(r);
} catch (IOException e) {
e.printStackTrace();
}
r.receiverOffset().commit().block();
});
return kafkaFlux1.subscribe(record -> {
System.out.println("hello"+record);
});