0

预期行为

我希望有一个回调来收听我订阅的每个主题,每条消息发送一次。我的意思是,我想订阅一个主题 1000 次,但是当收到一条消息时,我只想听一次。

IDK 如果我做错了什么(我猜)。

实际行为

  • 我正在开发一个家庭安全摄像头应用程序。
  • 我有一个我拥有的相机列表。
  • 对于列表中的每台相机,我都会订阅一个主题。
  • 每 30 秒,我更新一次屏幕,并再次为每台相机订阅一个主题。这意味着一个主题可以被多次订阅。
  • 每次我收到有关某个主题的消息时,回调都会触发有关同一主题被订阅多少次的消息。

重现

脚步

  1. 有一个主题相机/123
  2. 使用以下名为subscribeWith的方法订阅主题 N 次
  3. 通过摄像头发送消息/123
  4. 您将收到 N 次消息,因为您订阅了该主题 N 次

复制代码

只是变量

private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883

构建 MQTT

private fun build() {
        if (mqtt != null) return

        mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { Timber.d("On Connected") }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()
    }

连接 MQTT

fun connect(username: String, password: String) {
        build()

        this.username = username
        this.password = password

        mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username("abc")
                ?.password("123".toByteArray())
                ?.applySimpleAuth()
                ?.send()
    }

然后,订阅一个主题 每次我订阅一个主题时,我都会使用这些乐趣

fun subscribeWith(topic: String) {
        mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_MOST_ONCE)
               ?.callback { t -> onConsumingTopic(t) }  <- I THINK THIS IS THE IMPORTANT THING
                ?.send()
                ?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }

    }
4

2 回答 2

1

正如评论中提到的,目前唯一的解决方案是将订阅的主题列表保留在 MQTT 客户端库之外,并在订阅新主题之前对其进行检查。

于 2020-12-18T15:07:05.213 回答
1

我找到了正确的答案。

无需为每个订阅调用注册回调,也无需使用全局数组来处理注册的主题,如下所示:

mqtt?.subscribeWith()
    ?.callback { t -> onConsumingTopic(t) }  <- This is not needed

相反,您可以为所有消息注册一个“全局”回调,例如:

client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { ... } );

然后您可以在不提供回调的情况下订阅。

client.subscribeWith().topicFilter("test").qos(MqttQos.AT_LEAST_ONCE).send();

完整示例:

构建 MQTT

 mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { onMQTTConnected(it) }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()

        mqtt?.publishes(MqttGlobalPublishFilter.SUBSCRIBED) { onConsumingTopic(it) }

连接 MQTT

mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username(context.getString(R.string.mqtt_user))
                ?.password(context.getString(R.string.mqtt_pw).toByteArray())
                ?.applySimpleAuth()
                ?.send()

订阅主题

mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_LEAST_ONCE)
                ?.send()
                ?.whenComplete { ack, error -> onTopicSubscribed(ack, error, topic) }
于 2021-02-10T18:51:53.897 回答