3

我在尝试升级时遇到问题。

目前我使用的是 2.0.x 版本,特别是 -

reactor.bus
reactor.rx.Stream
reactor.rx.Streams
reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer

我正在使用 Maven,并且我对“projectreactor”有一个依赖项-

<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-core</artifactId>

升级到 3.0.4.RELEASE 版本时,为了继续使用我之前使用的所有东西,我需要显式导入 -

<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>

<groupId>io.projectreactor</groupId>
<artifactId>reactor-stream</artifactId>

但我还是不见了

reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer

我不知道该怎么做。

4

2 回答 2

1

reactor.fn.Consumer被 Java 8 取代java.util.function.Consumer

至于RingBufferProcessor您必须选择一个全部使用环形缓冲区的新处理器。

Dispatchers 现在是在底层使用 Java 的调度程序Executor

于 2017-01-18T08:55:34.113 回答
1
reactor.rx.Stream -> reactor.core.publisher.Flux
reactor.rx.Streams -> reactor.core.publisher.Flux
reactor.rx.Promise -> reactor.core.publisher.Mono and reactor.core.publisher.MonoProcessor
reactor.core.processor.RingBufferProcessor -> reactor.core.publisher.TopicProcessor
reactor.fn.Consumer -> java.until.function.Consumer (Java 8)

没有新的 spring 模块,因为 spring 5 直接包含对这些新类型的 Reactor 支持。

至于 reactor-bus :按照设计,现在所有流路由(Flux/Mono 链)都是类型化的,因​​此动态路由还不是我们功能的一部分。仍然有打字方式的替代方案,例如:

ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
Flux<MyEvent> interest1_2 = rp.filter(ev -> filterInterest1(ev) || filterInterest2(ev));

interest1.subscribe(doSomethingForInterest1);
interest2.subscribe(doSomethingForInterest2);
interest1_2.subscribe(doSomethingForInterest1_2);

rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest2")); //subscriber 2 and 3 react
rp.onNext(new MyEvent("interest4")); // buffered until interest subscriber because ReplayProcessor

//shutdown/cleanup/close
rp.onComplete();

我在 github 上找到了这似乎符合您的需求

于 2017-01-18T16:26:25.280 回答