在我的应用程序中,我有两个 verticles(标准一个而不是工人),其中一个,比如说VerticleA
,产生一些消息,另一个,比如说VerticleB
,消耗它们。在VerticleB
我创建了几个消费者来使用这些消息。
class VerticleA : AbstractVerticle() {
val publisher: MessageProducer<String>
fun start(promise: Promise<Void>) {
publisher = vertx.eventBus().sender<String>("address")
.setWriteQueueMaxSize(queueSize)
vertx.setPeriodic(timeout) {
if(publisher.writeQueueFull())
return
getAsyncMessages() { messages ->
messages.forEach { publisher.wirte(it) }
}
}
}
}
class VerticleB : AbstractVerticle() {
val consumers: List<MyConsumers>
override fun start(promise: Promise<Void>) {
// Some initialization
consumers = (1..count).map { createConsumer() }
}
fun createConsumer(): MessageConsumer<String> {
val consumer = vertx.eventBus().consumer<String>("address")
consumer.handler { message ->
consumer.pause()
asyncProcess(message) { consumer.resume() }
}
return consumer
}
}
当我仅使用单个VerticleB
ie实例部署应用程序时,DeploymentOptions.setInstances(1)
一切正常。但是,当我将实例数设置为一个以上时,消费者会在处理少量初始消息后停止处理消息。
从日志中我可以看到每个消费者都消费了几条消息,然后停止消费。暂停和恢复日志也是成对的,即每次pause()
调用都有一个resume()
调用,因此asyncProcess()
即使在出现错误的情况下也会调用回调。消费者也没有结束。
我在这里阅读了核心手册https://vertx.io/docs/vertx-core/java/但我没有看到任何可以解决此问题的内容。