我正在尝试修复现有的 React Native 库react-native-ble-plx在现有的 Java 代码中添加 onBackPressureBuffer() 。
我知道这很难看,但我现在没有时间提交 PR,并且有一个未决的问题可以解决这个问题。我这样做是因为事件发射器工作在 200Hz。我需要一种安全的方式来缓冲本机端的项目,同时它们在 JavaScript 端以自己的速度被消耗。
所以代码变成了下面这样:
final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call() {
int properties = gattCharacteristic.getProperties();
BluetoothGattDescriptor cccDescriptor = gattCharacteristic
.getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
: NotificationSetupMode.COMPAT;
if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
return connection.setupNotification(gattCharacteristic, setupMode);
}
if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
return connection.setupIndication(gattCharacteristic, setupMode);
}
return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
}
}).onBackpressureBuffer(1000) <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
}).subscribe(new Observer<byte[]>() {
@Override
public void onCompleted() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
@Override
public void onError(Throwable e) {
errorConverter.toError(e).reject(promise);
transactions.removeSubscription(transactionId);
}
@Override
public void onNext(byte[] bytes) {
characteristic.logValue("Notification from", bytes);
WritableArray jsResult = Arguments.createArray();
jsResult.pushNull();
jsResult.pushMap(characteristic.toJSObject(bytes));
jsResult.pushString(transactionId);
sendEvent(Event.ReadEvent, jsResult);
}
});
我的问题是,即使添加了那个,我也遇到了 MissingBackPressure 异常。
我已经尝试过 onBackPressureDrop() 并且我的行为完全相同。所以我认为我做错了,但现在不知道为什么。
任何帮助表示赞赏。