我正在尝试编写一个使用switchOnNext
运算符的简单示例。我想生成无限流的无限流。每个单独的流将生成从 1 到无穷大。
使用switchOnNext
我希望每个可观察对象都会发出它的第一个n
元素,然后是下一个元素,依此类推。
为了生成一个从 1 到无穷大的值的 observable,我实现了静态rangeInf
函数。该main
方法包含应该打印值的逻辑。
然而,当运行程序时,只有第一个流被订阅,并且只有它的值被打印到控制台。为了理智起见,我采用了可观察的区间(内置的也是无限的),但那里的行为与预期的一样。
我在这里想念什么?
起初我认为这是因为间隔是在它自己的单独线程上。但是我尝试添加.observeOn(Schedulers.computation())
该rangeInf
方法,但这似乎也不能解决问题。
输出与interval
> Task :Main.main()
Next observable
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
Next observable
id 115: value 1
id 115: value 1
id 115: value 1
id 115: value 1
...
输出与Observable.generate
> Task :Main.main()
Next observable
id 173: value 0
id 173: value 1
id 173: value 2
id 173: value 3
id 173: value 4
id 173: value 5
...
资源
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiConsumer;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class Main {
static Random r = new Random();
public static void main(String[] args) throws InterruptedException {
Observable<Observable<String>> inf =
Observable.interval(0, 10, TimeUnit.SECONDS)
.map(x -> {
int id = r.nextInt(1000);
return Main.rangeInf().map(v -> String.format("%d: %d", id, v));
})
.doOnNext(i -> System.out.println("Next observable"));
Observable.switchOnNext(inf)
.subscribe(System.out::println);
while(true) {
Thread.sleep(10000);
}
}
public static Observable<Integer> inf() {
Observable<Integer> inf= Observable.interval(1, TimeUnit.SECONDS)
.map(x -> (int) Math.random() * 3000 + 1);
return inf;
}
public static Observable<Integer> rangeInf() {
// Initial state.
Supplier<Integer> s = () -> 0;
// Generator.
BiFunction<Integer, Emitter<Integer>, Integer> nxt = (i, e) -> {
e.onNext(i);
delay(); // delay random amount of time.
return i + 1;
};
return Observable.generate(s, nxt);
}
public static void delay() {
int random = (int) (Math.random() * 1 + 1);
try {
Thread.sleep(random * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}