我正在使用反应器库从网络中获取大量数据流,并使用反应式 kafka 方法将其发送到 kafka 代理。
下面是我正在使用的 Kafka Producer
public class LogProducer {
private final KafkaSender<String, String> sender;
public LogProducer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions);
}
public void sendMessages(String topic, Flux<Logs.Data> records) {
AtomicInteger sentCount = new AtomicInteger(0);
AtomicInteger fCount = new AtomicInteger(0);
records.doOnNext(r -> fCount.incrementAndGet()).subscribe();
System.out.println("Total Records: " + fCount);
sender.send(records.doOnNext(r -> sentCount.incrementAndGet())
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
return SenderRecord.create(new ProducerRecord<>(topic, id,
lrec.toString()), id);
})).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' failed. "
+ e, topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
})
.subscribe();
}
}
Flux 中的记录总数(fCount)和发送到 Kafka 主题的记录(sentCount)不匹配,它没有给出任何错误并成功完成。
例如:在一种情况下,Flux 中的记录总数为 2758,而发送到 kafka 的计数为 256。是否有任何 kafka 配置需要修改,或者我错过了什么?
==================================================== =========
根据评论更新
sender.send(records
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
sleep(5); // sleep for 5 ns
return SenderRecord.create(new ProducerRecord<>(topic, id,
lrec.toString()), id);
})).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' failed. "
+ e, topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
})
.subscribe();
sleep(10); // sleep for 10 ns
上面的代码在一个系统中运行良好,但未能在另一个系统中发送所有消息。