我们正在尝试从 2.X 迁移到 3.X。 https://github.com/reactor/reactor-core/issues/375 我们在我们的应用程序中使用了 EventBus 作为事件管理器(低延迟 FX 系统),它非常适合我们。
更改后,我们决定采用每个模块并创建自己的处理器来处理事件。1. 从您的角度来看,这种用法是否正确?因为在当前阶段缺乏文档,并且在审查了所有内容之后,我们真的不知道在这里做什么 2. 我们尝试使用 Flux 以便每隔 X 间隔执行一次操作 例如:市场在 1 秒内到达 1000但我们希望在一秒钟内只处理 4 次更新。升级后我们正在使用:
带有缓冲区的处理器并发送到另一个方法。在这种方法中,我们有 Flux 获取列表并尝试并行工作以完成他的任务。我们遇到了 2 个主要问题: 1. 有时我们收到 Null 事件,我们找不到我们的系统正在发送给我想也许我们错过了使用处理器
//Definition of processor
ReplayProcessor<Event> classAEventProcessor = ReplayProcessor.create();
//Event handler subscribing
public void onMyEventX(Consumer<Event> consumer) {
Flux<Event> handler = classAEventProcessor .filter(event -> event.getType().equals(EVENT_X));
handler.subscribe(consumer);
}
在上面的例子中,处理程序中的事件有时会为空。一旦他做了流停止工作,直到我们重新启动服务器(因为只有在重新启动时我们才会创建处理器)
2.我们尝试过并行,但有时一些消息消失了,所以也许我们滥用了框架
//On constructor
tickProcessor.buffer(1024, Duration.of(250, ChronoUnit.MILLIS)).subscribe(markets ->
handleMarkets(markets));
//Handler
Flux.fromIterable(getListToProcess())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(entryMap -> {
DoBlockingWork(entryMap);
})
.sequential()
.subscribe();
这样做的目的是处理器将每 250 毫秒唤醒一次并调用处理程序。处理程序将与 Flux 并行工作,以进行更好和更快的处理。*如果 DoBlockingWork 花费超过 250 毫秒,我无法理解会发生什么行为
更新: EventBus 由我们包装,每个订阅的事件都会抛出包装的事件管理器。现在我们尝试为每个模块创建事件处理器,但它的工作速度非常慢。我们已经将 TopicProcessor 与 ThreadExecutor 一起使用,但仍然很慢.. EventBus 高速完成了同样的工作有人知道吗?顺便说一句,当我尝试使用 DirectProcessor 时,它似乎比 TopicProcessor 工作得更好