尝试结合 HiveMQ 的两个特性:共享订阅和持久会话。
如果创建了一个非常简单的消息生产者。和一个非常简单的消费者。当运行多个消费者时,所有消费者都会收到所有消息。
将消费者的 clearSession 设置为“false”后,在运行消费者时,重新启动消费者,消费者在未连接时也会收到消息。出色的。
现在将它与共享订阅功能结合起来。仅使用共享订阅时,clearSession 为“true”。运行多个消费者时,一条消息仅由单个消费者接收。它应该是循环的,情况也是如此,但是一旦您停止消费者,消息就不再是循环的,而是其中一个消费者获得的消息比其他消费者多得多。
如果我现在再次启用持久会话,clearSession 为“假”,并启动共享订阅消费者,消费者开始再次接收所有消息,而不是消息仅传递给一个客户端。
这里有什么问题?这是 HiveMQ 中的错误吗?持久会话和共享订阅不能一起使用吗?那真的会很糟糕。
更新 15/2/2017 正如@fraschbi 建议的那样,我清除了所有数据并再次使用持久会话消费者重新测试了共享订阅。它似乎工作!
但奇怪的是,只有在第一个消费者重新连接后才会收到丢失的消息。所有消费者都有相同的代码,他们只是从不同的 clientId 参数开始。请参阅下面的代码。我的测试顺序:
- 开始消费者1:所有消息都发给这个消费者。
- 开始的消费者2:每个消费者都会收到其他消息。
- 开始的消费者 3:每个消费者收到 3 条消息中的 1 条。
- 停止消费者 1:现在消费者 2 和 3 接收到其他所有消息。(不知道为什么我昨天看到了这种不均匀的分布,但可能正如@fraschbi 提到的那样,因为我正在重用 clientId 并且没有取消订阅或正确断开连接)
- 现在停止消费者2:消费者3现在收到的所有消息。
- 停止 consumer3:不再收到任何消息。
- 重新启动 consumer3:它继续生产者发送的第一条消息。它不接收丢失的消息。
- 重启consumer2:消息再次均匀分布。
- 重新启动 consumer1:这个现在接收所有丢失的消息,然后继续接收每 3 条消息中的 1 条。
所以我的新问题是:为什么只有第一个消费者收到丢失的消息?
注意:这里的技巧仍然是停止客户端时不要取消订阅,因为这样订阅/持久性设置就会丢失!
生产者.scala
object Producer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.connect()
val theTopic = client.getTopic(topic)
var count = 0
sys.addShutdownHook {
println("Disconnecting client...")
client.disconnect()
println("Disconnected.")
}
while(true) {
val msg = new MqttMessage(s"Message: $count".getBytes())
theTopic.publish(msg)
println(s"Published: $msg")
Thread.sleep(1000)
count = count + 1
}
}
消费者.scala
object Consumer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = args(1)
// val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.setCallback(new MqttCallback {
override def deliveryComplete(token: IMqttDeliveryToken) = ()
override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")
override def connectionLost(cause: Throwable) = println("Connection lost")
})
println(s"Start $clientId consuming from topic: $topic")
val options = new MqttConnectOptions()
options.setCleanSession(false);
client.connect(options)
client.subscribe(topic)
sys.addShutdownHook {
println("Disconnecting client...")
// client.unsubscribe(topic)
client.disconnect()
println("Disconnected.")
}
while(true) {
}
}