我是响应式编程的新手,我尝试实现一个非常基本的场景。每次将文件拖放到特定文件夹时,我都想向 kafka 发送一条消息。我认为我不太了解基础知识...所以请您帮帮我吗?
所以我有几个问题: smallrye-reactive-messaging 和 smallrye-reactive-streams-operators 有什么区别?
我有这个简单的代码:
@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
if(Objects.isNull(currentMessage)){
//currentMessage is an instance variable which is null when I start the application
return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
}
else {
//currentMessage has been correctly set with the file information
LOGGER.info(currentMessage);
return ReactiveStreams.of(currentMessage).map(Message::of);
}
}
当代码进入 if 语句时,一切正常,我的对象的 JSON 序列化将为空值。但是我不明白为什么当我的代码转到 else 语句时,没有任何话题?似乎 if 语句的 .of 指令破坏了流或类似的东西......
如何保持对新删除的文件“做出反应”的连续流?(或其他事件,如 HTTP GET 请求或类似的东西)......
例如,如果我不返回 PublisherBuilder 的实例,而是返回 Integer,那么我的 kafka 主题将由非常庞大的 Integer 值流填充。这就是为什么示例在发送消息时使用一些间隔......
我应该使用一些 CompletationStage 还是 CompletableFuture ?RXJAva2?使用哪个库有点令人困惑(vertx、smallrye、rxjava2、microprofile、...)
之间有什么区别:
- ReactiveStreams.fromCompletionStage
- ReactiveStreams.fromProcessor
- ReactiveStreams.fromPublisher
- ReactiveStreams.fromSubscriber
在哪种情况下使用哪个?
非常感谢 !