1

我正在运行 RxJava 并创建一个主题以使用onNext()方法来生成数据。我正在使用Spring

这是我的设置:

@Component
public class SubjectObserver {
    private SerializedSubject<SomeObj, SomeObj> safeSource;
    public SubjectObserver() {
       safeSource = PublishSubject.<SomeObj>create().toSerialized();
       **safeSource.subscribeOn(<my taskthreadExecutor>);**
       **safeSource.observeOn(<my taskthreadExecutor>);** 
       safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
          @Override
          public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
            LOGGER.debug("{} invoked.", Thread.currentThread().getName());
            doSomething();
          }
      }
    }
    public void publish(SomeObj myObj) {
        safeSource.onNext(myObj);
    }
}

在 RxJava 流上生成新数据的方式是通过@Autowire private SubjectObserver subjectObserver 然后调用subjectObserver.publish(newDataObjGenerated)

无论我为subscribeOn()&指定什么observeOn()

  • 调度程序.io()
  • Schedulers.computation()
  • 我的主题
  • Schedulers.newThread

onNext()和它里面的实际工作是在同一个线程上完成的,这个线程实际上调用onNext()了主题来生成/产生数据。

这个对吗?如果是这样,我错过了什么?我期待在doSomething()不同的线程上完成。

更新

在我的调用类中,如果我改变调用publish方法的方式,那么当然会分配一个新线程供订阅者运行。

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));

谢谢,

4

1 回答 1

8

每个运算符 on Observable/Subject返回一个具有额外行为的新实例,但是,您的代码只是应用subscribeOnobserveOn然后丢弃它们生成的任何内容并订阅 raw Subject。您应该链接方法调用,然后订阅:

safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();

safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
     @Override
     public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
         LOGGER.debug("{} invoked.", Thread.currentThread().getName());
         doSomething();
     }
});

请注意,这subscribeOn对 a 没有实际影响,PublishSubject因为它的方法中没有发生订阅副作用subscribe()

于 2016-10-25T21:39:19.037 回答