2

我是 Spring Reactor 的新手。我一直试图了解ConnectableFlux类的工作原理。我已经阅读了文档并查看了在线发布的示例,但遇到了一个问题。

有人能告诉我为什么connect()方法会阻塞吗?我在文档中没有看到任何内容说它应该阻止..特别是因为它返回一个 Disposable 供以后使用。鉴于下面的示例代码,我永远不会通过 connect() 方法。

我试图基本上模拟我过去多次使用的旧式监听器接口范例。我想学习如何使用反应式流重新创建服务类和侦听器架构。我有一个简单的服务类,它有一个名为“ addUpdateListener(Listener l) ”的方法,然后当我的服务类“ doStuff() ”方法触发一些事件传递给任何监听器。

我应该说我将编写一个 API 供其他人使用,所以当我说 Service 类时,我并不是指 Spring 术语中的 @Service。这将是一个普通的 java 单例类。

我只是将 Spring Reactor 用于 Reactive Streams。我也在看 RxJava.. 但想看看 Spring Reactor Core 是否可以工作。

我从下面的测试类开始只是为了理解库语法,然后陷入了阻塞问题。

我认为这里描述了我正在寻找的内容:多个订阅者

更新:通过调试器运行我的代码,ConnectableFlux 连接方法中的代码永远不会返回。它挂在内部连接方法上,并且永远不会从该方法返回。

reactor.core.publisher.ConnectableFlux

public final Disposable connect() {
        Disposable[] out = new Disposable[]{null};
        this.connect((r) -> {
            out[0] = r;
        });
        return out[0];
    }

任何帮助都会很棒!

这也是我的 maven pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SpringReactorTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>classworlds:classworlds</exclude>
                                    <exclude>junit:junit</exclude>
                                    <exclude>jmock:*</exclude>
                                    <exclude>*:xml-apis</exclude>
                                    <exclude>org.apache.maven:lib:tests</exclude>
                                    <exclude>log4j:log4j:jar:</exclude>
                                </excludes>
                            </artifactSet>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;

import java.util.concurrent.TimeUnit;

import static java.time.Duration.ofSeconds;

/**
 * Testing ConnectableFlux
 */
public class Main {

    private final static Logger LOG = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) throws InterruptedException {
        Main m = new Main();

        // Get the connectable
        ConnectableFlux<Object> flux = m.fluxPrintTime();

        // Subscribe some listeners
        // Tried using a new thread for the subscribers, but the connect call still blocks
        LOG.info("Subscribing");
        Disposable disposable = flux.subscribe(e -> LOG.info("Fast 1 - {}", e));
        Disposable disposable2 = flux.subscribe(e -> LOG.info("Fast 2 - {}", e));

        LOG.info("Connecting...");
        Disposable connect = flux.connect();// WHY does this block??
        LOG.info("Connected..");

        // Sleep 5 seconds
        TimeUnit.SECONDS.sleep(5);

        // Cleanup - Remove listeners
        LOG.info("Disposing");
        connect.dispose();
        disposable.dispose();
        disposable2.dispose();
        LOG.info("Disposed called");
    }

    // Just create a test flux
    public ConnectableFlux<Object> fluxPrintTime() {
        return Flux.create(fluxSink -> {
            while (true) {
                fluxSink.next(System.currentTimeMillis());
            }
        }).doOnSubscribe(ignore -> LOG.info("Connecting to source"))
                .sample(ofSeconds(2))
                .publish();
    }
}

运行上面的代码会给出以下输出..它只是以毫秒为单位打印时间,直到我 Ctrl-C 进程..

09:36:21.463 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:36:21.478 [main] INFO Main - Subscribing
09:36:21.481 [main] INFO Main - Connecting...
09:36:21.490 [main] INFO Main - Connecting to source
09:36:23.492 [parallel-1] INFO Main - Fast 1 - 1589808983492
09:36:23.493 [parallel-1] INFO Main - Fast 2 - 1589808983492
09:36:25.493 [parallel-1] INFO Main - Fast 1 - 1589808985493
09:36:25.493 [parallel-1] INFO Main - Fast 2 - 1589808985493
09:36:27.490 [parallel-1] INFO Main - Fast 1 - 1589808987490
09:36:27.490 [parallel-1] INFO Main - Fast 2 - 1589808987490
09:36:29.493 [parallel-1] INFO Main - Fast 1 - 1589808989493
...
4

1 回答 1

4

我收到了 Spring Reactor 团队的答复,我只是将其发布在这里,以防其他人遇到此问题...

问题的症结在于你在 Flux.create 中进入了一个无限循环。通量被订阅的那一刻,它将进入循环并且永远不会退出,以 CPU 最快的速度生成数据。使用 Flux.create 您至少应该在某个时候调用 sink.complete()。

我建议尝试例如。Flux.interval 作为您的常规滴答声的来源,它将摆脱 Flux.create 的多余复杂性,这让您负责反应流的较低级别概念(您需要的 onNext/onComplete/onError 信号学习,但也许不仅仅是现在)。

作为旁注,我会考虑到使用 Reactor(或 RxJava)模拟基于侦听器的 API 并不能充分体现反应式编程的功能。这是一个受限制的用例,可能会使您的注意力和期望远离反应式编程的真正好处

从更高的角度来看:

ConnectableFlux#connect() 的广泛概念是您有一个“临时”源,您希望在多个订阅者之间共享它,但它会在有人订阅它的那一刻被触发。因此,为了不错过任何事件,您将源转换为 ConnectableFlux,执行一些设置(订阅多个订阅者)并手动触发源(通过调用 connect())。它不是阻塞的,并返回一个表示上游连接的 Disposable`(如果您还想手动取消/释放整个订阅)。

PS:Bismuth 现在明显已经过时了,更喜欢使用最新的 Dysprosium release train

于 2020-05-19T15:28:24.273 回答