1

我想就如何正确实施这一点以及这是否是正确的方法获得意见。因此,我使用反应式消息创建了一个完全反应式的应用程序,但我知道我有一个总体约束——那个约束是 Oracle。如您所知,JDBC 本质上是阻塞的,不能真正异步执行。我试图找出一种方法来实现这一点,这样事件循环线程就不会被阻塞,到目前为止,唯一可行的事情是这样的:

@Incoming(KAFKA_DATA)
@Blocking
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
fun consume(record: Message<Notification>): CompletionStage<Void> {
    val metadata = record.getMetadata(IncomingKafkaRecordMetadata::class.java).orElse(null)
    if (metadata != null) {
        log.debug(
            "Received Kafka Record - " +
                "topic: {}, " +
                "partition: {}, " +
                "offset: {}, " +
                "payload: {}",
            metadata.topic, metadata.partition, metadata.offset, record.payload
        )
    }
    return Uni.createFrom().voidItem()
        .invoke { -> invokeBlockingCall(record.payload) }
        .call { -> handleSuccess(record) }
        .onFailure().call { failure -> handleFailure(failure, record) }
        .runSubscriptionOn(Infrastructure.getDefaultExecutor())
        .subscribeAsCompletionStage()
}

我已经设法让它按原样工作,但我不确定这是否是正确/有效的方式。有没有人对如何完成这样的任务有任何建议?任何建议都非常感谢和考虑,谢谢!

4

0 回答 0