9

我正在构建一个对低功耗蓝牙有特定要求的 Android 应用程序。

我需要写入只写特性并接收单独通知特性的响应,并且我需要在许多活动中执行此操作。是否有一种 Rx 方式可以在第一个特征上发送请求,等待第二个特征的答案,然后继续进行另一个请求?

此外,为了分享我的 RxAndroidBle 实例,我想过做一些 BleManager Singleton 来公开 Observables,这样我就可以在我的 Presenter 中轻松订阅它们。我只是想避免必须为每个活动复制连接逻辑并拥有(理想情况下)持久连接。这样我只能公开 connectionObservable 并订阅它,因此我可以轻松发送写入请求并获取通知,但我确信有更好的方法来做到这一点。

这就是我现在所拥有的:

@Singleton
public class BleManager {

  private PublishSubject<Void> disconnectTriggerSubject = PublishSubject.create();
  private Observable<RxBleConnection> connectionObservable;
  private boolean isConnected;

  private final UUID CTRL_FROM_BRIDGE_UUID = UUID.fromString("someUUID");
  private final UUID BLE_WRITE_CHARACTERISTIC_UUID = UUID.fromString("someOtherUUID");

  private final RxBleClient bleClient;
  private String mMacAddress;
  private final Context context;
  private RxBleDevice bleDevice;

  @Inject
  public BleManager(Context context, RxBleClient client) {
    Timber.d("Constructing BleManager and injecting members");
    this.context = context;
    this.bleClient = client;
  }

  public void setMacAddress(String mMacAddress) {
    this.mMacAddress = mMacAddress;

    // Set the associated device on MacAddress change
    bleDevice = bleClient.getBleDevice(this.mMacAddress);
  }

  public String getMacAddress() {
    return mMacAddress;
  }

  public RxBleDevice getBleDevice() {
    Preconditions.checkNotNull(mMacAddress);
    return bleClient.getBleDevice(mMacAddress);
  }

  public Observable<RxBleScanResult> getScanSubscription() {
    Preconditions.checkNotNull(context);
    Preconditions.checkNotNull(bleClient);

    return bleClient.scanBleDevices().distinct();
  }

  public Observable<RxBleConnection> getConnectionSubscription() {
    Preconditions.checkNotNull(context);
    Preconditions.checkNotNull(bleDevice);

    if (connectionObservable == null) {
      connectionObservable = bleDevice.establishConnection(context, false)
                                      .takeUntil(disconnectTriggerSubject)
                                      .observeOn(AndroidSchedulers.mainThread())
                                      .doOnUnsubscribe(this::clearSubscription)
                                      .compose(new ConnectionSharingAdapter());
    }

    return connectionObservable;
  }

  public Observable<byte[]> setupListeners() {
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CTRL_FROM_BRIDGE_UUID))
                               .doOnNext(notificationObservable -> Timber.d("Notification Setup"))
                               .flatMap(notificationObservable -> notificationObservable)
                               .observeOn(AndroidSchedulers.mainThread());
  }

  private void triggerDisconnect() {
    disconnectTriggerSubject.onNext(null);
  }


  public Observable<byte[]> writeBytes(byte[] bytes) {
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(
      BLE_WRITE_CHARACTERISTIC_UUID,
      bytes)).observeOn(AndroidSchedulers.mainThread());
  }

  private boolean isConnected() {
    return bleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED;
  }

  /**
   * Will update the UI with the current state of the Ble Connection
   */
  private void registerConnectionStateChange() {
    bleDevice.observeConnectionStateChanges().observeOn(AndroidSchedulers.mainThread()).subscribe(connectionState -> {
      isConnected = connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED);
    });
  }

  private void clearSubscription() {
    connectionObservable = null;
  }

}
4

1 回答 1

6

我对您的用例进行了一些思考。通过共享相同的连接,您将向需要一些状态处理的应用程序引入状态,因此不可能(或者至少我不知道如何)是纯粹的反应性。

我专注于建立连接并向序列化的 BLE 设备执行写入通知传输。

private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create();

private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create();

private Subscription connectionSubscription;

private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time

public void connect() {
    Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection()
    final UUID notificationUuid = // your notification characteristic UUID
    final UUID writeUuid = // your write-only characteristic UUID

    connectionSubscription = connectionObservable
            .flatMap(
                    rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications
                    (rxBleConnection, notificationObservable) -> // connection is established and notification prepared
                            inputSubject // waiting for the data-packet to transmit
                                    .onBackpressureBuffer()
                                    .flatMap(bytesAndFilter -> {
                                                return Observable.combineLatest( // subscribe at the same time to
                                                        notificationObservable.take(1), // getting the next notification bytes
                                                        rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device
                                                        (responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes
                                                )
                                                        .doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier
                                            },
                                            1 // serializing communication as only one Observable will be processed at the same time
                                    )
            )
            .flatMap(observable -> observable)
            .subscribe(
                    response -> { /* ignored here - used only as side effect with outputSubject */ },
                    throwable -> outputSubject.onError(throwable)
            );
}

public void disconnect() {
    if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) {
        connectionSubscription.unsubscribe();
        connectionSubscription = null;
    }
}

public Observable<byte[]> writeData(byte[] data) {
    return Observable.defer(() -> {
                final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response
                inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect()
                return outputSubject
                        .filter(responseIdPair -> responseIdPair.second == uniqueId)
                        .first()
                        .map(responseIdPair -> responseIdPair.first);
            }
    );
}

我认为这是一种很好的方法,因为整个流程都在一个地方描述,因此更容易理解。有状态的通信部分(写入请求和等待响应)是序列化的,它有可能保持连接直到disconnect()调用。

缺点是传输依赖于不同流的副作用,并且writeData()在建立连接之前调用,并且通知设置永远不会完成返回的 Observable,尽管通过状态检查为这种情况添加处理应该不是问题。

此致

于 2016-08-12T13:50:11.777 回答