0

我的应用程序实际上有一个 BLE 操作队列来与我的外围设备一起执行。每个操作都从建立与外围设备的连接开始,该外围设备返回Observable<RxBleConnection>. 队列中的第一项启动连接,后续操作只需接收 this (shared) 即可RxBleConnection

简而言之,队列通过以下方式执行:

Observable.concatDelayError(queuedOperations)

如果无法建立连接,或者在一次操作期间断开连接,则剩余的排队操作每次都会重试,尝试重新建立连接。

我决定修改行为,以便一旦连接无效,排队的操作应该总是接收现在无效的RxBleConnection,而不是重新建立新的连接。重试逻辑仍然执行,但在这些情况下会立即失败 --- 操作失败还有其他原因,与连接无关。

为了产生这种行为,我PublishSubject直接在Observable<RxBleConnection>. 这个主题只是简单地代表了原来的RxBleConnection——见下面的代码。如果连接达到错误状态,则后续订阅该主题将收到错误;否则,返回共享连接。这正是我想要的行为,并且在发生错误时它似乎按设计工作;但是,当一切都成功时,我现在遇到了问题。

在更改之前,一旦队列中的所有操作都被消费完,连接就会自动释放。但是,添加 后PublishSubject,操作成功,但连接保持打开状态。使用调试语句,我验证了主题的onUnsubscribeonTerminate从未被调用。原来的RxBleConnection最终超时---并且它onUnsubscribe确实onTerminate被调用了。

想知道我做错了什么导致应用程序保持与外围设备的连接。

private Observable.Transformer<RxBleConnection, RxBleConnection> createConnectionSubject() {
    return rxBleConnectionObservable -> {
        final PublishSubject<RxBleConnection> subject = PublishSubject.create();

        rxBleConnectionObservable.subscribe(
            subject::onNext,
            subject::onError,
            subject::onCompleted);

        return subject;
    };
}
4

1 回答 1

0

PublishSubject不会将取消订阅事件传递给上游。

也许你应该看看ConnectionSharingAdapter课程?

于 2017-08-26T16:24:45.383 回答