1

我们正在尝试从 2.X 迁移到 3.X。 https://github.com/reactor/reactor-core/issues/375 我们在我们的应用程序中使用了 EventBus 作为事件管理器(低延迟 FX 系统),它非常适合我们。

更改后,我们决定采用每个模块并创建自己的处理器来处理事件。1. 从您的角度来看,这种用法是否正确?因为在当前阶段缺乏文档,并且在审查了所有内容之后,我们真的不知道在这里做什么 2. 我们尝试使用 Flux 以便每隔 X 间隔执行一次操作 例如:市场在 1 秒内到达 1000但我们希望在一秒钟内只处理 4 次更新。升级后我们正在使用:

带有缓冲区的处理器并发送到另一个方法。在这种方法中,我们有 Flux 获取列表并尝试并行工作以完成他的任务。我们遇到了 2 个主要问题: 1. 有时我们收到 Null 事件,我们找不到我们的系统正在发送给我想也许我们错过了使用处理器

    //Definition of processor
    ReplayProcessor<Event> classAEventProcessor = ReplayProcessor.create();

    //Event handler subscribing
    public void onMyEventX(Consumer<Event> consumer) {
       Flux<Event> handler = classAEventProcessor .filter(event -> event.getType().equals(EVENT_X));
       handler.subscribe(consumer);
   }

在上面的例子中,处理程序中的事件有时会为空。一旦他做了流停止工作,直到我们重新启动服务器(因为只有在重新启动时我们才会创建处理器)

2.我们尝试过并行,但有时一些消息消失了,所以也许我们滥用了框架

    //On constructor
   tickProcessor.buffer(1024, Duration.of(250, ChronoUnit.MILLIS)).subscribe(markets -> 
   handleMarkets(markets));

   //Handler
  Flux.fromIterable(getListToProcess())
        .parallel()
    .runOn(Schedulers.parallel())
    .doOnNext(entryMap -> {
        DoBlockingWork(entryMap);
    })
    .sequential()
    .subscribe();

这样做的目的是处理器将每 250 毫秒唤醒一次并调用处理程序。处理程序将与 Flux 并行工作,以进行更好和更快的处理。*如果 DoBlockingWork 花费超过 250 毫秒,我无法理解会发生什么行为

更新: EventBus 由我们包装,每个订阅的事件都会抛出包装的事件管理器。现在我们尝试为每个模块创建事件处理器,但它的工作速度非常慢。我们已经将 TopicProcessor 与 ThreadExecutor 一起使用,但仍然很慢.. EventBus 高速完成了同样的工作有人知道吗?顺便说一句,当我尝试使用 DirectProcessor 时,它似乎比 TopicProcessor 工作得更好

4

1 回答 1

1

Reactor 3 是围绕您应该尽可能避免阻塞的概念构建的,因此在您的第二个片段DoBlockingWork中看起来不太好。

事件是如何产生的?您是否有基于侦听器的异步 API 来获取它们?如果是这样,您可以尝试使用Flux.create.

对于“我们在 1 秒内有 1000 个事件,但只想处理 4 个”的用例,我会链接一个sample操作员。例如,sample(Duration.ofMillis(250))将每秒分为 4 个窗口,它只会发出最后一个元素。

正在编写参考指南以及一个页面,您可以在其中找到指向外部文章和学习材料的链接。这里有 WIP 参考指南的预览,这里学习资源页面。

于 2017-01-23T16:48:29.460 回答