4

我使用一次性 Flowable 来发送和订阅项目。但是当我尝试使用 ConnectableFlowable 时,我无法向发射器发送取消信号。我怎么能理解 flowable 被放置在 Flowable.create 方法中?

您可以通过注释和取消注释“publish().autoConnect()”代码来查看场景。

Disposable disposable = Flowable.create(emitter -> {
        AtomicBoolean isRunning = new AtomicBoolean(true);
        AtomicInteger i = new AtomicInteger();
        new Thread(() -> {
            while (isRunning.get()) {
                i.getAndIncrement();
                System.out.println("Emitting:" + i.get());
                emitter.onNext(i.get());
                try {
                    Thread.sleep(1_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        emitter.setCancellable(() -> {
            System.out.println("Cancelled");
            isRunning.set(false);
        });
    }, BackpressureStrategy.BUFFER)
            .publish() //comment here
            .autoConnect() //and here
            .subscribe(s -> {
                System.out.println("Subscribed:" + s);
            });

    Thread.sleep(10_000);
    disposable.dispose();
    Thread.sleep(100_000);
4

1 回答 1

2

有一个重载可让您访问Disposable取消连接:

SerialDisposable sd = new SerialDisposable();

source.publish().autoConnect(1, sd::set);

// ...

sd.dispose();
于 2020-12-07T09:16:58.197 回答