0

我是响应式编程的新手,我尝试实现一个非常基本的场景。每次将文件拖放到特定文件夹时,我都想向 kafka 发送一条消息。我认为我不太了解基础知识...所以请您帮帮我吗?

所以我有几个问题: smallrye-reactive-messaging 和 smallrye-reactive-streams-operators 有什么区别?

我有这个简单的代码:

@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
     if(Objects.isNull(currentMessage)){
          //currentMessage is an instance variable which is null when I start the application
          return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
     }
     else {
          //currentMessage has been correctly set with the file information
          LOGGER.info(currentMessage);
          return ReactiveStreams.of(currentMessage).map(Message::of);
     }
}

当代码进入 if 语句时,一切正常,我的对象的 JSON 序列化将为空值。但是我不明白为什么当我的代码转到 else 语句时,没有任何话题?似乎 if 语句的 .of 指令破坏了流或类似的东西......

如何保持对新删除的文件“做出反应”的连续流?(或其他事件,如 HTTP GET 请求或类似的东西)......

例如,如果我不返回 PublisherBuilder 的实例,而是返回 Integer,那么我的 kafka 主题将由非常庞大的 Integer 值流填充。这就是为什么示例在发送消息时使用一些间隔......

我应该使用一些 CompletationStage 还是 CompletableFuture ?RXJAva2?使用哪个库有点令人困惑(vertx、smallrye、rxjava2、microprofile、...)

之间有什么区别:

  • ReactiveStreams.fromCompletionStage
  • ReactiveStreams.fromProcessor
  • ReactiveStreams.fromPublisher
  • ReactiveStreams.fromSubscriber

在哪种情况下使用哪个?

非常感谢 !

4

1 回答 1

0

让我们从 smallrye-reactive-messaging 和 smallrye-reactive-streams-operators 之间的区别开始: smallrye-reactive-streams-operators 与 smallrye-reactive-messaging 相同,但此外它还支持MicroProfile-context-propagation。由于大多数响应式消息传递提供程序在后台使用 Vert.x,因此它将以事件循环样式处理您的消息,这意味着它将在单独的线程中运行。有时您需要将一些 ctx 从基本线程传播到新线程(例如:填充 CDI 和 Tx 上下文以执行一些 JPA 实体管理器逻辑)。在这里 ctx 传播有帮助。

对于方法签名。您可以查看SmallRye-reactive-streams第 3,4 和 5 节的官方文档。每个都有不同的用例。您想使用哪种口味取决于您。

什么时候用什么?如果您不在响应式上下文中运行,则可以使用以下内容发送消息。

@Inject @Channel("my-channel") 发射器发射器;

对于消息消费,您可以使用这样的方法签名:

@Incoming("channel-2") public CompletionStage doSomething(Message anEvent)

或者

@Incoming("channel-2") public void doSomething(String anEvent)

希望有帮助。

于 2020-02-19T17:53:08.110 回答