0

我正在使用具有两个协同工作的特性的 BLE GATT 服务。一个是只写特性,您可以将字符串值作为查询提交,另一个是只通知特性,您可以在其中接收对查询的响应。

通知服务有点慢,重要的是不要在读取通知之前继续下一个查询——否则响应会丢失。

为此,我一直在使用 RxAndroidBle Observables,具有用于写入和通知特性的单独通道。第三个 Observable 提供查询。但是,写入的速度太快了。

ConnectableObservable notifyObservable =
    createNotifyObservable(NOTIFY_UUID).publish();

queryObserverable
    .doOnSubscribe(notifyObservable::connect)
    .doOnNext(query -> Log.d(TAG, "Processing query: " + query))
    .flatMap(query -> createWriteObservable(WRITE_UUID, query)))
    .doOnNext(request -> Log.d(TAG, "Write initiated."))
    .flatMap(request -> notifyObservable)
    .doOnNext(response -> Log.d(TAG, "Query response: " + response));

所以在运行应用程序时,这是我在日志中看到的(包括时间戳、进程 ID 和线程 ID):

06-22 14:30:01.991 14085-15360 Processing query: Query1
06-22 14:30:02.011 14085-15360 Processing query: Query2
06-22 14:30:02.011 14085-15360 Processing query: Query3
06-22 14:30:07.261 14085-15443 Write initiated.
06-22 14:30:07.301 14085-15445 Write initiated
06-22 14:30:07.321 14085-15447 Query response: Response3
06-22 14:30:07.321 14085-15449 Write initiated.
06-22 14:30:07.321 14085-15447 Query response: Response3
06-22 14:30:07.351 14085-15453 Query response: Response3

RxJava 有没有办法确保下一次写入仅在收到响应后发生?

编辑:在maxConnection为两个调用指定建议的参数时flatMap,调用以正确的顺序发生,但仅适用于来自 Observable 的第一个查询。这是此案例的日志:

06-22 14:39:09.841 22245-23079 Processing query: Query1
06-22 14:39:15.131 22245-23166 Write initiated.
06-22 14:39:15.201 22245-23169 Query response: Response1
4

2 回答 2

1

尝试将 maxConcurrent = 1 参数传递给您的 flatMap 调用:

...
.flatMap(query -> createWriteObservable(WRITE_UUID, query), 1)
...
.flatMap(request -> createNotifyObservable(NOTIFY_UUID), 1)
...
于 2016-06-22T18:24:56.350 回答
1

如果每个查询只有一个响应,那么您可以将查询打包在一个函数中:

Observable<String> query(String query) {
  return notifyObservable.flatMap(notifyBytesObservable -> // to be sure that when we will start writing we're already listening
    Observable.combineLatest( // with combineLatest both Observables will be subscribed at the same time
      notifyBytesObservable.first(), // to unsubscribe from notification after the first one
      createWriteObservable(uuid, query), 
      { notificationBytes, writtenBytes -> notificationBytes }
    )
  )
    .map(notificationBytes -> String(notificationBytes))
}

现在你应该可以像这样拨打电话了:

queryObserverable
  .flatMap(queryString -> query(queryString), 1)

我希望这可以帮助你。

此致

于 2016-07-08T15:38:49.257 回答