5

我是 Rxjava 的新手。我有以下代码:

    System.out.println("1: " + Thread.currentThread().getId());
    Observable.create(new rx.Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subcriber) {
            System.out.println("2: " + Thread.currentThread().getId());
            // query database
            String result = ....
            subcriber.onNext(result);
        }

    }).subscribeOn(Schedulers.newThread()).subscribe(countResult -> {
        System.out.println("3: " + Thread.currentThread().getId());
    });

例如,输出将是:

1:50
2:100
3:100

我希望订阅者在 id 为 50 的线程上运行。我该怎么做?

4

5 回答 5

1

我认为有两种情况。要么你需要它在 UI 线程上运行,要么因为同步。据我所知,您不能在特定线程上调用函数,因为调用该方法时,它会绑定到线程的上下文,因此不可能从一个线程调用一个方法到另一个线程。您的问题是订阅者中的方法是从调用的Schedulers.newThread()。我还发现了这个关于Schedulers.currentThread(). 您需要的是在调用观察者时通知调用者线程。

你也可以使用akka,用它编写并发代码更简单。

对不起我的语法不好。

于 2016-07-26T07:40:28.947 回答
0

文档

默认情况下,一个 Observable 和你应用到它的操作符链将在调用它的 Subscribe 方法的同一个线程上完成它的工作,并通知它的观察者。SubscribeOn 操作符通过指定 Observable 应该在其上运行的不同调度程序来更改此行为。ObserveOn 操作符指定了一个不同的调度器,Observable 将使用它来向它的观察者发送通知。

因此,您可以只使用subscribe而不是subscribeOn在创建集合的同一线程上观察您的集合,如下所示:

Observable.create(new rx.Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subcriber) {
            System.out.println("2: " + Thread.currentThread().getId());
            // query database
            String result = ....
            subcriber.onNext(result);
        }

    }).subscribe(countResult -> {
        System.out.println("3: " + Thread.currentThread().getId());
    });

更新: 如果您的应用程序是 Android 应用程序,您可以像您一样在后台线程上使用订阅,并使用Handler消息将结果传递给主线程。

如果您的应用程序是 Java 应用程序,我可能建议使用 wait() 和 notify() 机制,或者考虑使用 EventBus 或 akka 等框架来处理更复杂的场景。

于 2016-07-26T09:43:28.483 回答
-1

使用 RxJava 1.0.15,您可以toBlocking()在之前应用,subscribe并且所有内容都将在创建整个运算符序列的线程上运行。

于 2015-11-06T09:01:46.380 回答
-1

所以 subscribeOn 表示 Observable 将开始在哪个线程上发射项目,observeOn “切换”到 observable 链的其余部分的线程。在订阅之前放置一个 observeOn(schedulers.CurrentThread()) ,您将在创建此 observable 的线程中,而不是在其中执行它的线程中。这是一个很好地解释 rxjava 线程的资源。 http://www.grahamlea.com/2014/07/rxjava-threading-examples/

于 2015-11-06T14:43:42.887 回答
-1

我相信@akarnokd 是对的。您要么在没有的情况下运行,.subscribeOn(Schedulers.newThread())以便它同步发生,要么toBlocking()在订阅之前使用。或者,如果您只想让所有事情都发生在同一个线程上,但不必是当前线程,那么您可以这样做:

Observable
    .defer(() -> Observable.create(...))
    .subscribeOn(Schedulers.newThread())
    .subscribe(subscriber);

defer确保调用create发生在订阅线程上。

于 2016-07-21T10:10:57.233 回答