因此,我将 Quarkus 与 Microprofile Reactive Messaging 框架(带有 SmallRye Kafka 连接器)和 RxJava2 Flowable 流对象一起用于响应式消息接收/发送。我有一个微服务,它使用 @Incoming 和 @Outgoing 注释来正确使用通道从后面的主题中提取并将消息推送到主题。
但是,现在我想修改它,以便我仍然可以从 Kafka 主题中提取数据,然后将 JSON 有效负载发送到 REST 端点。据我所知,没有与 Quarkus HTTP 兼容的 SmallRye 连接器。有没有人碰巧知道任何方法可以让它工作?
示例函数
@Incoming("pre-check")
@Outgoing("post-check")
@Broadcast
public Flowable<CustomMessage> publishToApi(CustomMessage customMessage) {
LOGGER.info("Message received from topic = {}", customMessage);
if (customMessage.ready) {
return Flowable.just(customMessage);
}
else {
return Flowable.empty();
}
}