0

我想写一个 Kafka 消费者并在 Bigquery 中写入记录,我想在 Bigquery 中成功插入时手动提交偏移量。我写了一个示例代码,但它不起作用,有人可以帮忙吗

ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))

        Flux<ReceiverRecord<Integer, String>> kafkaFlux1 = KafkaReceiver.create(options).receive()
                .doOnNext(r -> {
                    try {
                        writeBigquery(r);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    r.receiverOffset().commit().block();
    });

        return kafkaFlux1.subscribe(record -> {
            System.out.println("hello"+record);
        });
4

0 回答 0