我的应用程序实际上有一个 BLE 操作队列来与我的外围设备一起执行。每个操作都从建立与外围设备的连接开始,该外围设备返回Observable<RxBleConnection>
. 队列中的第一项启动连接,后续操作只需接收 this (shared) 即可RxBleConnection
。
简而言之,队列通过以下方式执行:
Observable.concatDelayError(queuedOperations)
如果无法建立连接,或者在一次操作期间断开连接,则剩余的排队操作每次都会重试,尝试重新建立连接。
我决定修改行为,以便一旦连接无效,排队的操作应该总是接收现在无效的RxBleConnection
,而不是重新建立新的连接。重试逻辑仍然执行,但在这些情况下会立即失败 --- 操作失败还有其他原因,与连接无关。
为了产生这种行为,我PublishSubject
直接在Observable<RxBleConnection>
. 这个主题只是简单地代表了原来的RxBleConnection
——见下面的代码。如果连接达到错误状态,则后续订阅该主题将收到错误;否则,返回共享连接。这正是我想要的行为,并且在发生错误时它似乎按设计工作;但是,当一切都成功时,我现在遇到了问题。
在更改之前,一旦队列中的所有操作都被消费完,连接就会自动释放。但是,添加 后PublishSubject
,操作成功,但连接保持打开状态。使用调试语句,我验证了主题的onUnsubscribe
和onTerminate
从未被调用。原来的RxBleConnection
最终超时---并且它onUnsubscribe
确实onTerminate
被调用了。
想知道我做错了什么导致应用程序保持与外围设备的连接。
private Observable.Transformer<RxBleConnection, RxBleConnection> createConnectionSubject() {
return rxBleConnectionObservable -> {
final PublishSubject<RxBleConnection> subject = PublishSubject.create();
rxBleConnectionObservable.subscribe(
subject::onNext,
subject::onError,
subject::onCompleted);
return subject;
};
}