我是 Spring Reactor 的新手。我一直试图了解ConnectableFlux类的工作原理。我已经阅读了文档并查看了在线发布的示例,但遇到了一个问题。
有人能告诉我为什么connect()方法会阻塞吗?我在文档中没有看到任何内容说它应该阻止..特别是因为它返回一个 Disposable 供以后使用。鉴于下面的示例代码,我永远不会通过 connect() 方法。
我试图基本上模拟我过去多次使用的旧式监听器接口范例。我想学习如何使用反应式流重新创建服务类和侦听器架构。我有一个简单的服务类,它有一个名为“ addUpdateListener(Listener l) ”的方法,然后当我的服务类“ doStuff() ”方法触发一些事件传递给任何监听器。
我应该说我将编写一个 API 供其他人使用,所以当我说 Service 类时,我并不是指 Spring 术语中的 @Service。这将是一个普通的 java 单例类。
我只是将 Spring Reactor 用于 Reactive Streams。我也在看 RxJava.. 但想看看 Spring Reactor Core 是否可以工作。
我从下面的测试类开始只是为了理解库语法,然后陷入了阻塞问题。
我认为这里描述了我正在寻找的内容:多个订阅者
更新:通过调试器运行我的代码,ConnectableFlux 连接方法中的代码永远不会返回。它挂在内部连接方法上,并且永远不会从该方法返回。
reactor.core.publisher.ConnectableFlux
public final Disposable connect() {
Disposable[] out = new Disposable[]{null};
this.connect((r) -> {
out[0] = r;
});
return out[0];
}
任何帮助都会很棒!
这也是我的 maven pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SpringReactorTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>log4j:log4j:jar:</exclude>
</excludes>
</artifactSet>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofSeconds;
/**
* Testing ConnectableFlux
*/
public class Main {
private final static Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws InterruptedException {
Main m = new Main();
// Get the connectable
ConnectableFlux<Object> flux = m.fluxPrintTime();
// Subscribe some listeners
// Tried using a new thread for the subscribers, but the connect call still blocks
LOG.info("Subscribing");
Disposable disposable = flux.subscribe(e -> LOG.info("Fast 1 - {}", e));
Disposable disposable2 = flux.subscribe(e -> LOG.info("Fast 2 - {}", e));
LOG.info("Connecting...");
Disposable connect = flux.connect();// WHY does this block??
LOG.info("Connected..");
// Sleep 5 seconds
TimeUnit.SECONDS.sleep(5);
// Cleanup - Remove listeners
LOG.info("Disposing");
connect.dispose();
disposable.dispose();
disposable2.dispose();
LOG.info("Disposed called");
}
// Just create a test flux
public ConnectableFlux<Object> fluxPrintTime() {
return Flux.create(fluxSink -> {
while (true) {
fluxSink.next(System.currentTimeMillis());
}
}).doOnSubscribe(ignore -> LOG.info("Connecting to source"))
.sample(ofSeconds(2))
.publish();
}
}
运行上面的代码会给出以下输出..它只是以毫秒为单位打印时间,直到我 Ctrl-C 进程..
09:36:21.463 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:36:21.478 [main] INFO Main - Subscribing
09:36:21.481 [main] INFO Main - Connecting...
09:36:21.490 [main] INFO Main - Connecting to source
09:36:23.492 [parallel-1] INFO Main - Fast 1 - 1589808983492
09:36:23.493 [parallel-1] INFO Main - Fast 2 - 1589808983492
09:36:25.493 [parallel-1] INFO Main - Fast 1 - 1589808985493
09:36:25.493 [parallel-1] INFO Main - Fast 2 - 1589808985493
09:36:27.490 [parallel-1] INFO Main - Fast 1 - 1589808987490
09:36:27.490 [parallel-1] INFO Main - Fast 2 - 1589808987490
09:36:29.493 [parallel-1] INFO Main - Fast 1 - 1589808989493
...