0

我将更多地使用 RxJava 和 ReactFX,但我想了解的是如何协调两者,因为 ReactFX 没有 RxJava 依赖项,那么两者如何在同一个 monad 中相互交谈?对于没有大量样板ObservableValue的 JavaFX、RxJavaObservable和 ReactFX之间的桥接尤其如此。StreamEvent

我想用 RxJava 编写我的核心业务逻辑,因为它们并不总是支持 JavaFX 应用程序。但我希望 JavaFX UI 能够使用ReactFX和利用EventStream. EventStream所以我的问题是将 an变成 anObservable和 anObservable变成EventStream,Binding或的最有效方法是什么ObservableValue?我知道我可以全面使用 RxJava,但我想利用 ReactFX 的平台线程安全性和便利性......

//DESIRE 1- Turn EventStream into Observable in the same monad
Observable<Foo> obs = EventStream.valuesOf(fooObservableValue).toObservable();

//Desire 2- Turn Observable into ObservableValue, Eventstream, or Binding
Binding<Foo> obsVal = Observable.create(...).toBinding();
4

2 回答 2

3

要将 ReactFXEventStream转换为 RxJava Observable

Observable<Foo> toRx(EventStream<Foo> es) {
    PublishSubject<Foo> sub = PublishSubject.create();
    es.subscribe(sub::onNext);
    return sub;
}

要将 RxJavaObservable变成 ReactFX EventStream

EventStream<Foo> fromRx(Observable<Foo> obs) {
    EventSource<Foo> es = new EventSource<>();
    obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)));
    return es;
}

注意Platform.runLater(...)后一种情况。这使得在EventStreamJavaFX 应用程序线程上产生的发出事件。

另请注意,在这两种情况下,我们都忽略了Subscriptionsubscribe方法返回的 s。如果您正在为应用程序的生命周期建立绑定,这很好。另一方面,如果它们之间的绑定应该是短暂的,在第一种情况下,您将让您的 RxJava 组件公开Subject,您的 ReactFX 组件公开EventStream,然后根据需要执行subscribe/ unsubscribe。对于第二种情况也是如此。

于 2015-05-14T17:23:52.937 回答
2

我不熟悉 ReactFX,但查看 API 我可以推断出这些转换:

public static <T> Observable<T> toObservable(EventStream<? extends T> es) {
    return Observable.create(child -> {
        Subscription s = es.subscribe(child::onNext);
        child.add(Subscriptions.create(s::unsubscribe));
    });
}
public static <T> EventStream<T> toEventStream(Observable<? extends T> o) {
    return new EventStream<T>() {
        final Vector<Consumer<? super T>> observers = new Vector<>();
        @Override
        public void addObserver(Consumer<? super T> observer) {
            observers.add(observer);
        }

        @Override
        public void removeObserver(Consumer<? super T> observer) {
            observers.remove(observer);
        }
        @Override
        public Subscription subscribe(Consumer<? super T> subscriber) {
            addObserver(subscriber);

            rx.Subscriber<T> s = new rx.Subscriber<T>() {
                @Override
                public void onNext(T t) {
                    subscriber.accept(t);
                }
                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    removeObserver(subscriber);
                }
                @Override
                public void onCompleted() {
                    removeObserver(subscriber);
                }
            };
            o.subscribe(s);

            return () -> {
                s.unsubscribe();
                removeObserver(subscriber);
            };
        }
    };
}

两者都应该为您提供取消订阅功能,尽管 ReactFX 不支持同步取消订阅,而且我真的不知道 EventStream 是否可以用作热或冷可观察对象。我无法访问 Binding,所以无法帮助您。

于 2015-05-14T17:42:28.420 回答