3

我正在尝试连接到 MQTT 代理。如果连接失败,我想重试。我收到关于连接成功或失败的回调。

在阅读了 retryWhen 的多个示例并处理了异步回调之后,我将这段代码放在了一起。如果我成功连接,它工作正常。此外,如果我e.onError(throwable)Flowable. 但是,如果我e.onError(throwable)从回调的onFailure()方法调用它会使我的 android 应用程序崩溃。

这是代码:

RxJava 链

createConnectionFlowable(client, options)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .retryWhen(createRetryFunction())
    .subscribe(createConsumer());

创建一个Flowable

private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<String>() {

        public void subscribe(final FlowableEmitter<String> e) throws Exception {
                client.connect(options).setActionCallback(new IMqttActionListener() {
                    public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
                    public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
                });
        }
    }, BackpressureStrategy.BUFFER);
}

创建重试函数

private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
    return new Function<Flowable<Throwable>, Publisher<?>>() {

        public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
            return throwableFlowable.zipWith(
                    Flowable.range(1, 3),
                    new BiFunction<Throwable, Integer, Integer>() {
                        public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
                    }
            )
            .flatMap(new Function<Integer, Publisher<?>>() {
                public Publisher<?> apply(Integer integer) throws Exception {
                    return Flowable.timer(integer, TimeUnit.SECONDS);
                }
            });
        }
    };
}

消费者:在这里做所有好事

private Consumer<String> createConsumer() {
    return new Consumer<String>() {
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: do important stuff here" + s);
        }
    };
}

错误日志

12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
                                                                     Process: com.work.app, PID: 16769
                                                                     Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: 

问题

  1. 为什么此代码会引发导致应用程序崩溃的异常?理想情况下,它应该处理异常?我在这里想念什么?
  2. 为什么不重试3次?
  3. e.onError(throwable)如果我从方法同步调用,为什么相同的代码会正确重试Flowable.subscribe()

参考

  1. RxJava 1.x retryWhen 文档
  2. 这个博客
4

2 回答 2

1
  1. 由于您subscribe使用Consumer<String>您没有为流定义错误处理程序。这意味着错误将通过RxJavaPlugins.getErrorHandler().handleError(...). 在 android 上,这个处理程序似乎会导致致命错误。要解决此问题,请使用 aObserver<String>而不是Consumer<String>
  2. 该日志似乎表明客户端在 Rx 之外进行了 3 次失败(“onFailure”被提及 3 次)。如果我不得不猜测客户端可能是有状态的,这意味着在初始连接后续调用之后会client.connect(...)表现出某种形式的怪异行为,从而导致问题。由于日志显示error - 1 sec wait - error, error我猜回调仍然处于活动状态,因此第二次失败被发送到 RxJava 两次。
  3. 假设您在waitForCompletion()谈论同步时谈论的是方法,它将支持我在 2 中的假设。由于没有注册回调,每个 throwable 只会被报告一次,从而修复行为。

我不确定为什么发射器在终止后仍会保持功能(onError/onComplete),但由于规范要求这些方法仅在可能是导致此问题的未指定行为时才被调用。

于 2016-12-20T19:21:26.697 回答
0

我终于得到了这个工作!

事实证明,这不是 RxJava2 的问题,而是 Mqtt (Eclipse Paho)IMqttActionListener​​ 在主线程上运行回调的方式,即使客户端是在不同的线程上创建的!!!.

对此的简单解决方案是等待客户端连接到创建它的线程上。问题中共享的代码是正确的,除了这种方法

@NonNull
public Flowable<Boolean> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final FlowableEmitter<Boolean> e) throws Exception {
            IMqttToken connect = client.connect(options);
            connect.waitForCompletion(); //this is blocking and is what was required!!
            if (client.isConnected()) {
                e.onNext(true);
                e.onComplete();
            } else {
                e.onError(connect.getException());
            }

        }
    }, BackpressureStrategy.BUFFER);
}

希望这对使用这些库的人有所帮助:)

于 2016-12-27T16:24:42.643 回答