1

我正在尝试编写一个使用switchOnNext运算符的简单示例。我想生成无限流的无限流。每个单独的流将生成从 1 到无穷大。

使用switchOnNext我希望每个可观察对象都会发出它的第一个n元素,然后是下一个元素,依此类推。

为了生成一个从 1 到无穷大的值的 observable,我实现了静态rangeInf函数。该main方法包含应该打印值的逻辑。

然而,当运行程序时,只有第一个流被订阅,并且只有它的值被打印到控制台。为了理智起见,我采用了可观察的区间(内置的也是无限的),但那里的行为与预期的一样。

我在这里想念什么?

起初我认为这是因为间隔是在它自己的单独线程上。但是我尝试添加.observeOn(Schedulers.computation())rangeInf方法,但这似乎也不能解决问题。

输出与interval

> Task :Main.main()
Next observable
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
Next observable
id 115: value 1
id 115: value 1
id 115: value 1
id 115: value 1
...

输出与Observable.generate

> Task :Main.main()
Next observable
id 173: value 0
id 173: value 1
id 173: value 2
id 173: value 3
id 173: value 4
id 173: value 5
...

资源

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiConsumer;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Main {
    static Random r = new Random();
    public static void main(String[] args) throws InterruptedException {
        Observable<Observable<String>> inf =
                Observable.interval(0, 10, TimeUnit.SECONDS)
                        .map(x -> {
                            int id = r.nextInt(1000);
                            return Main.rangeInf().map(v -> String.format("%d: %d", id, v));
                        })
                        .doOnNext(i -> System.out.println("Next observable"));

        Observable.switchOnNext(inf)
                .subscribe(System.out::println);

        while(true) {
            Thread.sleep(10000);
        }
    }

    public static Observable<Integer> inf() {
        Observable<Integer> inf=  Observable.interval(1, TimeUnit.SECONDS)
                .map(x -> (int) Math.random() * 3000 + 1);

        return inf;

    }
    public static Observable<Integer> rangeInf() {
        // Initial state.
        Supplier<Integer> s = () -> 0;
        // Generator.
        BiFunction<Integer, Emitter<Integer>, Integer> nxt = (i, e) -> {
            e.onNext(i);
            delay(); // delay random amount of time.
            return i + 1;
        };
        return Observable.generate(s, nxt);
    }

    public static void delay() {
        int random = (int) (Math.random() * 1 + 1);
        try {
            Thread.sleep(random * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
4

0 回答 0