2

我正在尝试修复现有的 React Native 库react-native-ble-plx在现有的 Java 代码中添加 onBackPressureBuffer() 。

我知道这很难看,但我现在没有时间提交 PR,并且有一个未决的问题可以解决这个问题。我这样做是因为事件发射器工作在 200Hz。我需要一种安全的方式来缓冲本机端的项目,同时它们在 JavaScript 端以自己的速度被消耗。

所以代码变成了下面这样:

       final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
            @Override
            public Observable<Observable<byte[]>> call() {
                int properties = gattCharacteristic.getProperties();
                BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                        .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
                NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                        : NotificationSetupMode.COMPAT;
                if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
                    return connection.setupNotification(gattCharacteristic, setupMode);
                }

                if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
                    return connection.setupIndication(gattCharacteristic, setupMode);
                }

                return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
            }
        }).onBackpressureBuffer(1000)  <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
            @Override
            public Observable<byte[]> call(Observable<byte[]> observable) {
                return observable;
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }
        }).subscribe(new Observer<byte[]>() {
            @Override
            public void onCompleted() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onError(Throwable e) {
                errorConverter.toError(e).reject(promise);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onNext(byte[] bytes) {
                characteristic.logValue("Notification from", bytes);
                WritableArray jsResult = Arguments.createArray();
                jsResult.pushNull();
                jsResult.pushMap(characteristic.toJSObject(bytes));
                jsResult.pushString(transactionId);
                sendEvent(Event.ReadEvent, jsResult);
            }
        });

我的问题是,即使添加了那个,我也遇到了 MissingBackPressure 异常。

我已经尝试过 onBackPressureDrop() 并且我的行为完全相同。所以我认为我做错了,但现在不知道为什么。

任何帮助表示赞赏。

4

1 回答 1

0

正如您所说,您遇到了一个react-native库问题,并且上面的代码之前确实抛出了MissingBackpressureException

来自.onBackpressureDrop()(粗体我的)的Javadoc:

指示发出项目的速度快于其观察者可以消耗它们的 Observable 丢弃而不是发射其观察者不准备观察的那些项目。

如果下游请求计数达到 0,那么 Observable 将避免调用 {@code onNext},直到观察者再次调用 {@code request(n)} 以增加请求计数。

背压:
操作员接受来自下游的背压并以无限制的方式使用源 {@code Observable}(即,不对其施加背压)。
调度器:
{@code onBackpressureDrop} 默认情况下不会在特定的 {@link Scheduler} 上运行。

您可以看到链中的下一个运算符是.flatMap(),.doOnUnsubscribe().subscribe()

来自.flatMap()关于背压的 Javadoc:

背压:
操作员尊重来自下游的背压。外部 {@code Observable} 在无界模式下使用(即,没有对其施加背压)。内部的 {@code Observable} 应该遵守背压;如果违反,操作员可能会发出 {@code MissingBackpressureException} 信号。

爪哇文档.doOnUnsubscribe()

背压:
{@code doOnUnsubscribe} 不与背压请求或价值传递交互;在其上游和下游之间保留了背压行为。

并且.subscribe()

背压:
操作员以无限制的方式使用源 {@code Observable}(即,没有对其施加背压)。

正如您所看到的,以下操作员都.onBackpressure*()没有对其施加背压。您需要在.onBackpressure*(). 这样的运营商之一是.observeOn(Scheduler)

爪哇文档.observeOn()

背压:此操作符尊重来自下游的背压,并期望来自源 {@code Observable}。违反此期望将导致 {@code MissingBackpressureException}。这是弹出异常的最常见的运算符;在链上查找不支持背压的源,例如 {@code interval}、{@code timer}、{code PublishSubject} 或 {@code BehaviorSubject},并在应用之前应用任何 {@code onBackpressureXXX} 运算符应用 {@code observeOn} 本身。

所以一个可行的代码可能是这样的:

final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
    @Override
    public Observable<Observable<byte[]>> call() {
        int properties = gattCharacteristic.getProperties();
        BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
        NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                : NotificationSetupMode.COMPAT;
        if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
            return connection.setupNotification(gattCharacteristic, setupMode);
        }

        if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
            return connection.setupIndication(gattCharacteristic, setupMode);
        }

        return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
    }
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
    @Override
    public Observable<byte[]> call(Observable<byte[]> observable) {
        return observable;
    }
})
.doOnUnsubscribe(new Action0() {
    @Override
    public void call() {
        promise.resolve(null);
        transactions.removeSubscription(transactionId);
    }
})
.onBackpressureBuffer(1000) // <---- Here is my modification
.observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
.subscribe(new Observer<byte[]>() {
    @Override
    public void onCompleted() {
        promise.resolve(null);
        transactions.removeSubscription(transactionId);
    }

    @Override
    public void onError(Throwable e) {
        errorConverter.toError(e).reject(promise);
        transactions.removeSubscription(transactionId);
    }

    @Override
    public void onNext(byte[] bytes) {
        characteristic.logValue("Notification from", bytes);
        WritableArray jsResult = Arguments.createArray();
        jsResult.pushNull();
        jsResult.pushMap(characteristic.toJSObject(bytes));
        jsResult.pushString(transactionId);
        sendEvent(Event.ReadEvent, jsResult);
    }
});
于 2019-03-21T17:19:40.830 回答