0

我有以下代码:

private static void log(Object msg) {
        System.out.println(
                Thread.currentThread().getName() +
                        ": " + msg);
}

Observable<Integer> naturalNumbers = Observable.create(emitter -> {
            log("Invoked"); // on main thread
            Runnable r = () -> {
                log("Invoked on another thread");
                int i = 0;
                while(!emitter.isDisposed()) {
                    log("Emitting "+ i);
                    emitter.onNext(i);
                    i += 1;
                }
            };
            new Thread(r).start();
        });
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));

所以这里我们有 2 个重要的 lambda 表达式。第一个是我们传递给 Observable.create 的那个,第二个是我们传递给 Observable.subscribe() 的回调。在第一个 lambda 中,我们创建一个新线程,然后在该线程上发出值。在第二个 lambda 中,我们有代码来接收在第一个 lambda 代码中发出的那些值。我观察到两个代码都在同一个线程上执行。

Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2

为什么会这样?默认情况下,RxJava 是否在同一线程上运行代码发射值(可观察)和代码接收值(观察者)?

4

1 回答 1

1

让我们看看,如果你使用 aThread来执行一个 runnable 会发生什么:

测试

@Test
  void threadTest() throws Exception {
    log("main");
    CountDownLatch countDownLatch = new CountDownLatch(1);

    new Thread(
            () -> {
              log("thread");
              countDownLatch.countDown();
            })
        .start();

    countDownLatch.await();
  }

输出

main: main
Thread-0: thread

看来,主入口点是从main线程调用的,而新创建的入口点是Thread调用Thread-0.

为什么会这样?默认情况下,RxJava 是否在同一线程上运行代码发射值(可观察)和代码接收值(观察者)?

默认情况下RxJava是单线程的。因此,生产者,如果没有通过不同的定义observeOn或不同的线程布局,将在(订阅者)线程subscribeOn上发出值。consumer这是因为RxJava默认情况下会在订阅堆栈上运行所有内容。

示例 2

@Test
  void fdskfkjsj() throws Exception {
      log("main");

      Observable<Integer> naturalNumbers =
        Observable.create(
            emitter -> {
              log("Invoked"); // on main thread
              Runnable r =
                  () -> {
                    log("Invoked on another thread");
                    int i = 0;
                    while (!emitter.isDisposed()) {
                      log("Emitting " + i);
                      emitter.onNext(i);
                      i += 1;
                    }
                  };
              new Thread(r).start();
            });
    Disposable disposable = naturalNumbers.subscribe(i -> log("Received " + i));

    Thread.sleep(100);
  }

输出2

main: main
main: Invoked
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1

在您的示例中,很明显,main 方法是从主线程调用的。此外,subscribeActual调用也在调用线程 ( main) 上运行。但是Observable#createlambdaonNext从新创建的线程调用Thread-0。该值从调用线程推送给订阅者。在这种情况下,调用线程是Thread-0,因为它调用onNext下游订阅者。

如何区分生产者和消费者?

使用observeOn/subscribeOn运算符来处理RxJava.

我应该使用低级线程构造ẁith RxJava 吗?

不,您不应该使用new Thread以将生产者与消费者分开。违约很容易,onNext不能同时调用(交错),因此违约。这就是为什么RxJava提供一个名为Schedulerwith Workers 的构造来减轻此类错误的原因。

注意:我认为这篇文章描述得很好:http: //introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html。请注意,这是 Rx.NET,但原理完全相同。如果您想阅读有关并发的信息,RxJava还可以查看 Davids 博客(https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html)或阅读本书(使用 RxJava 进行反应式编程https:// /www.oreilly.com/library/view/reactive-programming-with/9781491931646/

于 2021-01-05T18:17:13.957 回答