我正在尝试连接到 MQTT 代理。如果连接失败,我想重试。我收到关于连接成功或失败的回调。
在阅读了 retryWhen 的多个示例并处理了异步回调之后,我将这段代码放在了一起。如果我成功连接,它工作正常。此外,如果我e.onError(throwable)
从Flowable
. 但是,如果我e.onError(throwable)
从回调的onFailure()
方法调用它会使我的 android 应用程序崩溃。
这是代码:
RxJava 链
createConnectionFlowable(client, options)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(createRetryFunction())
.subscribe(createConsumer());
创建一个Flowable
private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
return Flowable.create(new FlowableOnSubscribe<String>() {
public void subscribe(final FlowableEmitter<String> e) throws Exception {
client.connect(options).setActionCallback(new IMqttActionListener() {
public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
});
}
}, BackpressureStrategy.BUFFER);
}
创建重试函数
private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
return new Function<Flowable<Throwable>, Publisher<?>>() {
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.zipWith(
Flowable.range(1, 3),
new BiFunction<Throwable, Integer, Integer>() {
public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
}
)
.flatMap(new Function<Integer, Publisher<?>>() {
public Publisher<?> apply(Integer integer) throws Exception {
return Flowable.timer(integer, TimeUnit.SECONDS);
}
});
}
};
}
消费者:在这里做所有好事
private Consumer<String> createConsumer() {
return new Consumer<String>() {
public void accept(String s) throws Exception {
Log.d(TAG, "accept: do important stuff here" + s);
}
};
}
错误日志
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.work.app, PID: 16769
Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms:
问题
- 为什么此代码会引发导致应用程序崩溃的异常?理想情况下,它应该处理异常?我在这里想念什么?
- 为什么不重试3次?
e.onError(throwable)
如果我从方法同步调用,为什么相同的代码会正确重试Flowable.subscribe()
?
参考