4

我开始使用 Java 9 Flow API,我发现并且不喜欢的第一件事是,当我们将订阅者实现传递给发布者时,我们似乎不能使用 lambda,就像我们可以使用 RxJava 一样

所以我必须定义和实现我自己的订阅者类

public class CustomSubscriber<T> implements Flow.Subscriber<T> {

        protected Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscription done:");
            subscription.request(1);
        }

        @Override
        public void onNext(T item) {
            System.out.println("Got : " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    }

然后把它传给我的出版商

   SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    publisher.subscribe(new CustomSubscriber<>());

这真的很冗长,据我了解,这是因为我们需要在onSubscribe回调 中设置订阅

protected Flow.Subscription subscription;     

以后用于onNext继续排放subscription.request(1);

我仍然不明白为什么需要这种机制,但它避免了像我们在 RxJava 中那样使用 Lambda 作为这个例子

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
        e -> System.out.println("do something in the onError"),
        () -> System.out.println("Do something in the onComplete"));

我想这是不可能的,我不会错过任何东西,对吧?

4

2 回答 2

4

我仍然不明白为什么需要这种机制

订阅支持从订阅者到发布者的通信。该request方法允许订阅者应用背压,通知上游组件它已经超载并且“需要休息”。为了做到这一点,订阅者需要保留订阅的一个实例,并且需要偶尔调用request以获取更多项目。

无压力Subscriber

如果您有一个不需要应用背压并且希望从降低的复杂性中受益的用例,您可以实现 a LaidBackSubscriber,它:

  • 通过onSubscribe存储订阅并立即调用request它来实现
  • 通过执行onNext在构造期间给定的 lambda 然后调用来实现subscription.request(1)
  • 实现onErroronComplete通过执行在构造过程中给出的 lambda

那应该可以得到你想要的。

一般建议

Java 9 Flow API作为现有异步库的集成点而创建的,而不是作为以临时方式实现反应式组件的邀请。尝试一下很好,但如果你真的想创建一个反应式系统,现有的库可能非常适合。

于 2017-09-30T14:33:51.380 回答
2

Java 9 Flow API 是一套准系统,包含 4 个接口和 1 个从非反应式世界到反应式世界的桥接类。没有运算符,没有方便的 lambda 版本,没有别的。

理论上,引入它是为了允许 JDK 本身基于响应式原则构建内部组件,但没有令人放心的迹象表明这种情况正在发生。

因此,用户有责任在这个 API 上构建组件,这很困难、乏味且容易出错。你最好等待主流库发布兼容版本,或者坚持使用更多可用的基于 Reactive-Streams.Org 的库,例如 RxJava 2 和 Reactor 3。

如果您仍然对在 Flow API 之上手动构建感兴趣,您可以查看我的研究/原型库Reactive4JavaFlow,它实现了所需的lambda重载。

于 2017-09-30T14:35:49.780 回答