我正在运行 RxJava 并创建一个主题以使用onNext()
方法来生成数据。我正在使用Spring。
这是我的设置:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
在 RxJava 流上生成新数据的方式是通过@Autowire private SubjectObserver subjectObserver
然后调用subjectObserver.publish(newDataObjGenerated)
无论我为subscribeOn()
&指定什么observeOn()
:
- 调度程序.io()
- Schedulers.computation()
- 我的主题
- Schedulers.newThread
它onNext()
和它里面的实际工作是在同一个线程上完成的,这个线程实际上调用onNext()
了主题来生成/产生数据。
这个对吗?如果是这样,我错过了什么?我期待在doSomething()
不同的线程上完成。
更新
在我的调用类中,如果我改变调用publish
方法的方式,那么当然会分配一个新线程供订阅者运行。
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
谢谢,