1

我有两个 observables:第一个来自库 RxAndroidBle:

Observable<RxBleConnection> bluetoothObservable = RxBleClient.create(getBaseContext()).getBleDevice(macAddress)
.establishConnection(false)

哪个连接到设备并在有订阅者时保持连接,另一个

Observable<Response> serverObservable = Observable.fromCallable(() -> callServer())

然后我把它们拉在一起

bluetoothObservable.zipWith(serverObservable , (rxBleConnection, s) -> {
                                Log.d(TAG, "zip done");
                                return "mock result";
                            }).subscribe((s) -> {},
                                    Throwable::printStackTrace);

但是在 zipbluetoothObservable被取消订阅并且连接立即下降之后。我应该怎么做才能压缩这些 observables 并保持bluetoothObservable活动/订阅?

4

1 回答 1

2

而不是.zip()你可以使用:

Observable.combineLatest(
  bluetoothObservable, 
  serverObservable,
  (rxBleConnection, s) -> {
    Log.d(TAG, "combined");
    return "mock result";
  }
)
  .subscribe(
    (s) -> {},
    Throwable::printStackTrace
  )

解释:zip尝试将两个Observables 的输出一一合并。如果其中一个将完成并且之前的所有排放Observable都匹配 - 没有必要保留对另一个的订阅,Observable因为后续排放将不会被使用。combineLatest只是试图将两个Observables 的所有排放组合成最新的对。

于 2017-07-05T12:58:09.813 回答