0

我已经在后台线程中成功实现了 HiveMQ,但是在激活飞行模式后出现了一个小问题。日志显示了原因,但我认为缺少一些东西,我看不到缺少的错误处理程序在哪里。

日志:

com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException: Session expired as connection was closed.
System.err  W  io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call

实现代码:

    client = Mqtt5Client.builder()
            .serverHost(host)
            .serverPort(port)
            .identifier(clientId)
            .addDisconnectedListener(new MqttClientDisconnectedListener() {
                @Override
                public void onDisconnected(MqttClientDisconnectedContext context) {
                    Log.d(TAG, "On disconnected... " + context.getCause());
                }
            })
            .automaticReconnectWithDefaultConfig()
            .buildRx();
    Mqtt5Connect connect = Mqtt5Connect.builder()
            .willPublish()
                .topic(willTopic)
            .applyWillPublish()
            .build();

    Completable connectScenario = client.connect(connect)
            .doOnSuccess(this::connectSuccess)
            .doOnError(this::connectFailed)
            .ignoreElement();

    Single<Mqtt5PublishResult> publishConnect
            = client.publish(Flowable.just(
                    Mqtt5Publish.builder()
                            .topic("d/" + this.clientId + START)
                            .payload(startData.toByteArray())
                            .build())).singleOrError();

            connectScenario
            .andThen(publishConnect)
            .doOnSuccess(this::onConnectSuccess)
            .doOnError(this::disconnectError)
            .subscribe();

肯定缺少一些东西,但问题是我应该在哪里处理断开连接事件。

4

1 回答 1

4

doOnError如果添加回调,RxJava 不会将错误视为已处理。

您可以将错误处理程序添加到subscribe调用中:

connectScenario
    .andThen(publishConnect)
    .subscribe(this::onConnectSuccess, this::disconnectError);

相反,您也可以在doOnError回调中处理错误后忽略该错误:

connectScenario
    .andThen(publishConnect)
    .doOnSuccess(this::onConnectSuccess)
    .doOnError(this::disconnectError)
    .ignoreElement().onErrorComplete()
    .subscribe();

如果您想容忍暂时的网络不可用,您应该使用 sessionExpiryInterval > 0 并自动重新连接。

于 2019-09-19T07:43:05.963 回答