5

我正在使用 Mosquitto Mqtt 和 paho API 在 android 设备上接收推送消息。但是一旦网络连接发生变化,它就会停止接收消息。以下是使用简单测试用例重现问题的步骤:

1)创建一个简单的活动。

2) On Activity StartUp 通过 paho API 连接到 mosquitto 测试服务器 (test.mosquitto.org:1883)。

3)订阅一些主题。

4)向主题发布一些消息。

结果: Mqtt 客户端收到所有发布到该主题的消息。现在

5)禁用移动互联网连接(移动数据)

6)向主题发布一些消息。

7) 重新连接互联网。

结果:客户端在禁用 Internet 连接后未收到任何发布的消息。

由于KeepAliveInterval一直保持较高值(30 分钟),它应该在重新连接到 Internet 后接收所有消息。

相同的用例(相同的代码)适用于简单的 java 项目(非 android),我在笔记本电脑上禁用互联网以运行用例。

知道为什么它不能在安卓设备上运行吗???我错过了什么吗?

笔记:

1) 使用 mqtt-client-0.4.1

2) Android 目标 API 级别 11

3) 测试期间不让设备进入睡眠模式。

4)在connectionLost回调中没有得到任何调用,并且mqtt回调的所有4个线程都在整个测试用例中运行,即mosquitto服务器的连接完好无损。

4

6 回答 6

4

Java 客户端库在一定程度上受制于底层网络 API。调用 publish 时,它会向套接字写入一个 MQTT 数据包。如果该写入失败,则将调用连接丢失,如果该写入有效,则客户端库将继续。您看到的行为差异是因为网络库在这些情况下的行为不同。

MQTT 保活间隔旨在帮助解决此问题。在某些情况下,TCP 连接可能看起来是活动的,但实际上并非如此。这在移动或卫星连接设备上尤其可能 - 您不能期望网络 API 在所有情况下都完全相同。Keepalive 向服务器发送一个 ping 数据包并期待响应 - 如果没有收到该响应,则假定会话已关闭。

如果将 keepalive 间隔设置为 10 秒,则应在 15 到 20 秒内将连接识别为断开。

于 2014-08-13T08:35:06.657 回答
2

您可以将 MqttCallback 侦听器附加到 MqttAsyncclient 。它有回调方法连接丢失,当连接丢失事件发生或 paho 断开连接时将被调用。

于 2014-09-27T10:57:58.790 回答
1

为了解决这个问题,每当互联网连接重新打开时,我都必须对代理进行显式 ping(以及等待 ping 响应的计时器)。如果 ping 失败或计时器超时,我会强制终止现有连接(disconnectForcibly),然后显式调用 connectionLost 方法。(然后仅从 connectionLost 方法重新连接)。

于 2014-09-29T05:39:49.260 回答
1

我已经遇到了这个问题,并通过检查 MqttAndroidClient 连接并.isConnected()在时间间隔内使用来修复它。

于 2018-12-19T07:06:47.697 回答
1

为您服务:-

 //Receiver that notifies the Service when the phone gets data connection
  private NetworkConnectionIntentReceiver netConnReceiver;

创建以下类:-

/*
* Called in response to a change in network connection - after losing a
*  connection to the server, this allows us to wait until we have a usable
*  data connection again
*/
class NetworkConnectionIntentReceiver extends BroadcastReceiver
{
  private static  String TAG ="NetworkConnectionIntentReceiver";
  @Override
  public void onReceive(Context ctx, Intent intent)
  {
    // we protect against the phone switching off while we're doing this
    //  by requesting a wake lock - we request the minimum possible wake
    //  lock - just enough to keep the CPU running until we've finished

    PowerManager pm = (PowerManager) ctx.getSystemService(ctx.POWER_SERVICE);
    PowerManager.WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MQTT");
    wl.acquire();

    Connection c = Connections.getInstance(ctx).getConnection(clientHandle);
    final ActionListener callback = new ActionListener(ctx,
                ActionListener.Action.CONNECT, clientHandle,null);
    c.getClient().setCallback(new MqttCallbackHandler(ctx, clientHandle,messenger_where_incoming_messages_tobe_sent));
    c.getClient().connect(c.getConnectionOptions(), null, callback);

    /*    The Above Reconnect Logic can be put up in a Reconnect() function.
     *    OR WRITE Any Other LOGIC TO RECONNECT TO MQTT
     */       

    // we're finished - if the phone is switched off, it's okay for the CPU
    //  to sleep now
    wl.release();
}

现在在 OnResume() 或 onCreate 中适当的地方调用以下方法来注册 BroadcastReceiver。

synchronized void handleNetworkChange()
{

    // changes to the phone's network - such as bouncing between WiFi
    //  and mobile data networks - can break the MQTT connection
    // the MQTT connectionLost can be a bit slow to notice, so we use
    //  Android's inbuilt notification system to be informed of
    //  network changes - so we can reconnect immediately, without
    //  haing to wait for the MQTT timeout
    if (netConnReceiver == null)
    {
        netConnReceiver = new NetworkConnectionIntentReceiver();
        registerReceiver(netConnReceiver,
                new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));

    }
}
于 2015-12-11T05:39:13.340 回答
1

我修复了以下错误(使用 rxJava2,但不是必需的):

    public void reconnect() {
        Completable.create(emitter -> {
            while (!mqttClient.isConnected()) {
                mqttClient.connect(options, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        emitter.onComplete();
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        LogHelper.d(TAG,"try to connect failed");
                    }
                });

                Thread.sleep(2000);
            }
            emitter.onComplete();
        })
        .subscribeOn(Schedulers.io())
        .subscribe();
    }

和一个示例调用

private BroadcastReceiver changeNetworkStateReceiver = new BroadcastReceiver() {
    @Override
    public void onReceive(Context context, Intent intent) {
        if (Objects.equals(intent.getAction(), NetworkStateReceiver.EVENT_CHANGE_NETWORK_STATE)) {
            if(Utils.isOnline(context)) {
                mqttClient.reconnect();
            }
        }
    }
};
于 2018-03-26T18:21:01.770 回答