6

I am new to reactive programming and confused about composing observables that have dependencies. Here is the scenario: There are two observables A, B. Observable A depends on a value emitted by B. (Therefore A needs to observe B). Is there a way to create an Observable C that composes A and B, and emits V? I am just looking for pointers in the RxJava documentation.

4

4 回答 4

1

你的问题对 A 如何依赖于 B 有点模糊,所以我将尝试举几个例子来说明如何组合 observables。

示例 -没有B就无法创建A - 使用 map()

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class A {
    public final B b;
    public A(B b) {
        this.b = b;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return createObservableB()
            .map(new Func1<B, A>() {
                @Override
                public A call(B b) {
                    return new A(b);
                }
            });
}

示例 - 每次出现B都可以创建零个或多个A - 使用 flatMap()

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class A {
    public final int value;
    public A(int value) {
        this.value = value;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return createObservableB()
            .flatMap(new Func1<B, Observable<? extends A>>() {
                @Override
                public Observable<? extends A> call(final B b) {
                    return Observable.create(new Observable.OnSubscribe<A>() {
                        @Override
                        public void call(Subscriber<? super A> subscriber) {
                            for (int i = 0; i < b.value; i++) {
                                subscriber.onNext(new A(i));
                            }
                            subscriber.onCompleted();
                        }
                    });
                }
            });
}

我不确定你对 Observables CV的要求是什么,所以让我们看看更多组合 observables 的方法。

示例 - 组合两个可观察对象发出的每对项目 - 使用 zip()

public class A {
    public final int value;
    public A(int value) {
        this.value = value;
    }
}

public class B {
    public final int value;
    public B(int value) {
        this.value = value;
    }
}

public class C {
    private final A a;
    private final B b;
    public C(A a, B b) {
        this.a = a;
        this.b = b;
    }
}

public Observable<B> createObservableB() {
    return Observable.from(new B(0), new B(1), new B(2), new B(3));
}

public Observable<A> createObservableA() {
    return Observable.from(new A(0), new A(1), new A(2), new A(3));
}

public Observable<C> createObservableC() {
    return Observable.zip(createObservableA(), createObservableB(),
            new Func2<A, B, C>() {
                @Override
                public C call(A a, B b) {
                    return new C(a, b);
                }
            }
    );
}

示例 - 合并两个 Observable 的最后一项 - 使用 combineLatest()

// Use the same class definitions from previous example.
public Observable<C> createObservableC1() {
    return Observable.combineLatest(createObservableA(), createObservableB(),
            new Func2<A, B, C>() {
                @Override
                public C call(A a, B b) {
                    return new C(a, b);
                }
            }
    );
}
于 2014-08-13T18:53:06.900 回答
0

我也是响应式编程的新手,只是将一些可能对您的案例感兴趣的代码放在一起

A需要观察B

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.testng.Assert.assertTrue;

public class Q22284380TestCase {

    private static final Logger LOGGER = LoggerFactory.getLogger(
            Q22284380TestCase.class);

    private AtomicBoolean completed = new AtomicBoolean(false);

    @Test
    public void testName() throws Exception {

        final Observable.OnSubscribe<Integer> onSubProduceTwoValues = new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> subscriber) {

                final Thread thread = new Thread(new Runnable() {

                    public Integer i = 0;

                    @Override
                    public void run() {

                        final Integer max = 2;
                        while (i < max) {
                            subscriber.onNext(i);
                            i++;
                        }

                        subscriber.onCompleted();
                    }
                });

                thread.start();
            }
        };

        final Observable<Integer> values = Observable.create(onSubProduceTwoValues);

        final Observable<Integer> byTwoMultiplier = values
                .flatMap(new Func1<Integer, Observable<Integer>>() {

                    @Override
                    public Observable<Integer> call(Integer aValue) {

                        return doubleIt(aValue);

                    }
                });

        byTwoMultiplier.subscribe(new Subscriber<Integer>() {

            @Override
            public void onNext(Integer a) {

                LOGGER.info("" + a);

            }

            @Override
            public void onCompleted() {

                completed.set(true);

            }

            @Override
            public void onError(Throwable e) {

                LOGGER.error(e.getMessage());
            }
        });

        Thread.sleep(1000L);
        assertTrue(completed.get());

    }

    private Observable<Integer> doubleIt(final Integer value) {

        return Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> subscriber) {

                final Thread thread = new Thread(new Runnable() {

                    @Override
                    public void run() {

                        try {
                            subscriber.onNext(value * 2);
                            subscriber.onCompleted();
                        } catch (Throwable e) {
                            subscriber.onError(e);
                        }

                    }
                });

                thread.start();

            }
        });
    }
}

有一个值的生产者,它只是使用flatMap将一个 doubleIt 函数应用于输出。要做一些不同的事情,如果你想要一个由 A 和 B 组合而成的 V ,你可以阅读zip 。

于 2014-05-04T16:21:51.163 回答
0

我认为这取决于您需要做的 A 和 B 之间的组合类型以及 A 如何依赖于 B。

C 是否逐对组成 A 和 B 对(A1 与 B1 结合,A2 与 B2 结合等) - 那么zip将是您想要的功能。但是,在那种情况下,我想知道当你首先将 B 转换为 A 时,你是否不能只做这项工作 - 毕竟我假设你将 B 逐个元素转换为 A (在这种情况下, map 将是要走的路)。

相反,如果您想为 B 发出的每个值创建一个新的 A(但想将所有这些 As 组合成一个 Observable),那么flatMap这就是您所需要的。

如果您确实首先需要 B 来创建A,然后再次需要它来组合A 和 B,那么您可能希望cacheB 为您省去重新计算所有内容的麻烦。

这里还有其他可能感兴趣的函数(如reducecombineLatest)。也许你可以提供更多关于你想要做什么的细节?

于 2014-05-05T07:30:51.723 回答
0

如果您正在寻找使异步可观察对象工作,我建议您快速查看这个问题RxJava Fetching Observables In Parallel

Ben(RxJava 的作者)帮助澄清了我对这个话题的疑惑。

希望这可以帮助

阿南德

于 2014-10-08T18:31:05.377 回答