这是上一个响应式 kafka 问题(发送数据通量到响应式 kafka 时发出的问题)的后续问题。
我正在尝试使用响应式方法将一些日志记录发送到 kafka。这是使用响应式 kafka 发送消息的响应式代码。
public class LogProducer {
private final KafkaSender<String, String> sender;
public LogProducer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions);
}
public void sendMessages(String topic, Flux<Logs.Data> records) throws InterruptedException {
AtomicInteger sentCount = new AtomicInteger(0);
sender.send(records
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
Thread.sleep(0, 5); // sleep for 5 ns
return SenderRecord.create(new ProducerRecord<>(topic, id,
lrec.toString()), id);
})).doOnNext(res -> sentCount.incrementAndGet()).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' failed. "
+ e, topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
})
.subscribe();
}
}
public class ExecuteQuery implements Runnable {
private LogProducer producer = new LogProducer("localhost:9092");
@Override
public void run() {
Flux<Logs.Data> records = ...
producer.sendMessages(kafkaTopic, records);
.....
.....
// processing related to the messages sent
}
}
因此,即使Thread.sleep(0, 5);
存在,有时它也不会将所有消息发送到 kafka,并且程序存在早期打印成功消息(log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
)。有没有更具体的方法来解决这个问题。例如,使用某种回调,以便该线程将等待所有消息发送成功。
我有一个弹簧控制台应用程序并通过调度程序以固定速率运行 ExecuteQuery,就像这样
public class Main {
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);
public static void main(String[] args) {
QueryScheduler scheduledQuery = new QueryScheduler();
scheduler.scheduleAtFixedRate(scheduledQuery, 0, 5, TimeUnit.MINUTES);
}
class QueryScheduler implements Runnable {
@Override
public void run() {
// preprocessing related to time
executor.execute(new ExecuteQuery());
// postprocessing related to time
}
}
}