我正在尝试订阅以在发出项目时自动取消订阅。基本 observable 是这样创建的。
public static Observable<RxBleConnection> setupConnection(RxBleDevice device, PublishSubject<Void> disconnectTrigger) {
return device
.establishConnection(false)
.takeUntil(disconnectTrigger)
.retry(3)
.retryWhen(o -> o.delay(RETRY_DELAY, TimeUnit.MILLISECONDS))
.compose(new ConnectionSharingAdapter());
}
然后我尝试将三个读取操作组合成一个ProgramModel
.
private void readCharacteristics(Action1<ProgramModel> onReadSuccess) {
mConnectionObservable
.flatMap(rxBleConnection ->
// combines the following three observables into a single observable that is
// emitted in onNext of the subscribe
Observable.combineLatest(
rxBleConnection.readCharacteristic(UUID_SERIAL_NUMBER),
rxBleConnection.readCharacteristic(UUID_MACHINE_TYPE),
rxBleConnection.readCharacteristic(UUID_CHARACTERISTIC),
ProgramModel::new))
.observeOn(AndroidSchedulers.mainThread())
.take(1)
.subscribe(programModel -> {
programModel.trimSerial();
onReadSuccess.call(programModel);
}, BleUtil::logError);
}
所以理论上一旦程序模型通过oNext
订阅,订阅就会被取消订阅。由于某种原因,操作被卡住并且onNext
永远onError
不会被调用。如果我删除take(1)
它可以正常工作,但我不想处理保留对订阅的引用并手动取消订阅。有谁知道我做错了什么或者为什么onNext
没有被叫到?