1

我对 MissingBackpressureException 有疑问。

我添加了一些 .onBackpressureDrop() 只是为了测试,但仍然出现异常。

我添加了 RxJavaHooks.enableAssemblyTracking() 以获得更多日志细节。

1-3分钟后抛出异常。

知道这段代码有什么问题吗?

谢谢帮助。

抛出异常的代码:

  Subscription notifySubscription = connection.setupNotification(notifyCharacteristic)
                                    .onBackpressureBuffer()
                                    .doOnNext(new Action1<Observable<byte[]>>() {
                                        @Override
                                        public void call(Observable<byte[]> observable) {
                                            Log.d(TAG, "Notifications set, calling bypassConnect()");

                                            // Bypass connect in WB to make it aware of this new device
                                            String wbAddress = addressMap.getOrCreateWbAddress(bleMac);
                                            bleWrapper.bypassConnect(wbAddress);
                                        }
                                    })
                                    .onBackpressureDrop()
                                    .flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                                        @Override
                                        public Observable<byte[]> call(Observable<byte[]> observable) {
                                            return observable;
                                        }
                                    })
                                    .onBackpressureDrop()
                                    .subscribe(new Action1<byte[]>() {
                                        @Override
                                        public void call(byte[] bytes) {
                                            dataAvailable(bleMac, bytes);
                                        }
                                    }, new Action1<Throwable>() {
                                        @Override
                                        public void call(Throwable throwable) {
                                            Log.e(TAG, "dataAvailable() Error: ", throwable);
                                        }
                                    });

登录:

rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:325)
at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:379)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:361)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at com.jakewharton.rxrelay.RelaySubscriptionManager$RelayObserver.onNext(RelaySubscriptionManager.java:205)
at com.jakewharton.rxrelay.PublishRelay.call(PublishRelay.java:47)
at com.jakewharton.rxrelay.SerializedAction1.call(SerializedAction1.java:84)
at com.jakewharton.rxrelay.SerializedRelay.call(SerializedRelay.java:20)
at com.polidea.rxandroidble.internal.connection.RxBleGattCallback$4.onCharacteristicChanged(RxBleGattCallback.java:139)
at android.bluetooth.BluetoothGatt$1.onNotify(BluetoothGatt.java:438)
at android.bluetooth.IBluetoothGattCallback$Stub.onTransact(IBluetoothGattCallback.java:399)
at android.os.Binder.execTransact(Binder.java:453)
Caused by: rx.exceptions.AssemblyStackTraceException: Assembly trace:
at rx.Observable.unsafeCreate(Observable.java:162)
at rx.Observable.lift(Observable.java:299)
at rx.Observable.merge(Observable.java:2572)
at rx.Observable.merge(Observable.java:2914)
at rx.Observable.merge(Observable.java:2637)
at com.polidea.rxandroidble.internal.connection.RxBleGattCallback.getOnCharacteristicChanged(RxBleGattCallback.java:311)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager.observeOnCharacteristicChangeCallbacks(NotificationAndIndicationManager.java:186)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager.access$400(NotificationAndIndicationManager.java:31)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager$1$1.call(NotificationAndIndicationManager.java:110)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager$1$1.call(NotificationAndIndicationManager.java:107)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerScalarProducer.request(OnSubscribeConcatMap.java:366)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:278)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)
at rx.in
4

1 回答 1

1

看起来Observable返回的rxandroidble不支持背压但flatMap期望它,因此您必须在.onBackpressureDrop里面申请flatMap

.flatMap(observable -> observable.onBackpressureDrop())

请注意,在外部 flatMap应用运算符通常不会影响合并源时内部发生的事情。

于 2017-07-31T07:56:41.400 回答