我想就如何正确实施这一点以及这是否是正确的方法获得意见。因此,我使用反应式消息创建了一个完全反应式的应用程序,但我知道我有一个总体约束——那个约束是 Oracle。如您所知,JDBC 本质上是阻塞的,不能真正异步执行。我试图找出一种方法来实现这一点,这样事件循环线程就不会被阻塞,到目前为止,唯一可行的事情是这样的:
@Incoming(KAFKA_DATA)
@Blocking
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
fun consume(record: Message<Notification>): CompletionStage<Void> {
val metadata = record.getMetadata(IncomingKafkaRecordMetadata::class.java).orElse(null)
if (metadata != null) {
log.debug(
"Received Kafka Record - " +
"topic: {}, " +
"partition: {}, " +
"offset: {}, " +
"payload: {}",
metadata.topic, metadata.partition, metadata.offset, record.payload
)
}
return Uni.createFrom().voidItem()
.invoke { -> invokeBlockingCall(record.payload) }
.call { -> handleSuccess(record) }
.onFailure().call { failure -> handleFailure(failure, record) }
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.subscribeAsCompletionStage()
}
我已经设法让它按原样工作,但我不确定这是否是正确/有效的方式。有没有人对如何完成这样的任务有任何建议?任何建议都非常感谢和考虑,谢谢!