0

我正在使用反应器在整个应用程序中发布事件,并让不同的消费者响应他们的事件。

这是我的反应器配置

@Configuration
@EnableReactor
public class ReactorConfiguration {

    static {
       Environment.initializeIfEmpty().assignErrorJournal();
    }

    @Bean
    public EventBus eventBus() {
       return EventBus.config().env(Environment.get()).dispatcher(Environment.SHARED).get();
}

我期望使用默认的基于环形缓冲区的调度程序,并且应该并行处理发送给单个消费者的多条消息。相反,它似乎是以同步方式处理事件。线程shared-1用于处理我的event1到consumer1,然后只有在完成event1的处理后,同一个线程才开始在consumer1上处理event2。

如何以一种我应该能够将多个事件发送给多个消费者并且所有事件都并行处理的方式来实现并行处理。

我将不胜感激任何建议。

这就是我将事件分派到事件总线的方式

dispatch(ReactorEvents.REPORT_REQUEST_EVENT, "", event);

protected <T> void dispatch(String selector, String info, T event) {
    eventBus.notify(selector, Event.wrap(Tuple.of(info, event)));
}

这是其中一位消费者

@Consumer
public class ReportRequestHandler {
...
  @Selector(ReactorEvents.REPORT_REQUEST_EVENT)
  @Override
  public void handleRequest(Tuple2<String, ReportRequestEvent> tuple) {
    ReportRequestEvent event = tuple.getT2();
    log.debug("processing report request " + event.getId());
    ....
  }
}
4

1 回答 1

1

默认的 Ring Buffer 实现是单线程的。这就是为什么您正在观察您所描述的副作用。有关可供您选择的各种调度程序的更多详细信息,请查看以下链接:

http://projectreactor.io/docs/reference/#_dispatchers

于 2015-12-23T03:40:42.533 回答