我试图解决的问题用以下代码表示:
@Test
public void buffer_shouldZipAllTheThings() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Observable<List<String>> strings = Observable.from(Arrays.asList("red", "blue", "yellow", "green")).buffer(3);
Observable<List<Integer>> integers = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)).buffer(3);
class Zipped {
final List<String> strings;
final List<Integer> integers;
Zipped(List<String> strings, List<Integer> integers) {
this.strings = strings;
this.integers = integers;
}
@Override public String toString() {
return "Strings: { " + strings.toString() + " }\n" + "Integers: { " + integers.toString() + " }";
}
}
Observable<Zipped> zipper = Observable.zip(strings, integers, new Func2<List<String>, List<Integer>, Zipped>() {
@Override public Zipped call(List<String> strings, List<Integer> integers) {
return new Zipped(strings, integers);
}
});
zipper.subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Zipped>() {
@Override public void onCompleted() {
latch.countDown();
}
@Override public void onError(Throwable throwable) {
}
@Override public void onNext(Zipped zipped) {
System.out.println("-- New zipped object: ");
System.out.println(zipped.toString());
}
});
latch.await();
}
这是此测试的输出:
-- New zipped object:
Strings: { [red, blue, yellow] }
Integers: { [1, 2, 3] }
-- New zipped object:
Strings: { [green] }
Integers: { [4, 5, 6] }
这是我想要的输出:
-- New zipped object:
Strings: { [red, blue, yellow] }
Integers: { [1, 2, 3] }
-- New zipped object:
Strings: { [green] }
Integers: { [4, 5, 6] }
-- New zipped object:
Strings: { [] }
Integers: { [7, 8, 9] }
-- New zipped object:
Strings: { [] }
Integers: { [10, 11] }