0

我正在使用反应器库从网络中获取大量数据流,并使用反应式 kafka 方法将其发送到 kafka 代理。

下面是我正在使用的 Kafka Producer

public class LogProducer {

    private final KafkaSender<String, String> sender;

    public LogProducer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);

        sender = KafkaSender.create(senderOptions);
    }

    public void sendMessages(String topic, Flux<Logs.Data> records) {
    
        AtomicInteger sentCount = new AtomicInteger(0);
        AtomicInteger fCount = new AtomicInteger(0);
        
        records.doOnNext(r -> fCount.incrementAndGet()).subscribe();
        System.out.println("Total Records: " + fCount);
        
        sender.send(records.doOnNext(r -> sentCount.incrementAndGet())
                .map(record -> {
                    LogRecord lrec = record.getRecords().get(0);
                    String id = lrec.getId();
                    return SenderRecord.create(new ProducerRecord<>(topic, id,
                            lrec.toString()), id);
                })).then()
                .doOnError(e -> {
                    log.error("[FAIL]: Send to the topic: '{}' failed. "
                            + e, topic);
                })
                .doOnSuccess(s -> {
                    log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
                })
                .subscribe();
    }

}

Flux 中的记录总数(fCount)和发送到 Kafka 主题的记录(sentCount)不匹配,它没有给出任何错误并成功完成。

例如:在一种情况下,Flux 中的记录总数为 2758,而发送到 kafka 的计数为 256。是否有任何 kafka 配置需要修改,或者我错过了什么?

==================================================== =========

根据评论更新

sender.send(records
        .map(record -> {
            LogRecord lrec = record.getRecords().get(0);
            String id = lrec.getId();
            sleep(5); // sleep for 5 ns
            return SenderRecord.create(new ProducerRecord<>(topic, id,
                    lrec.toString()), id);
        })).then()
        .doOnError(e -> {
            log.error("[FAIL]: Send to the topic: '{}' failed. "
                    + e, topic);
        })
        .doOnSuccess(s -> {
            log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
        })
        .subscribe();
sleep(10); // sleep for 10 ns

上面的代码在一个系统中运行良好,但未能在另一个系统中发送所有消息。

4

0 回答 0