0

在我的应用程序中,我有两个 verticles(标准一个而不是工人),其中一个,比如说VerticleA,产生一些消息,另一个,比如说VerticleB,消耗它们。在VerticleB我创建了几个消费者来使用这些消息。

class VerticleA : AbstractVerticle() {
  val publisher: MessageProducer<String>
  
  fun start(promise: Promise<Void>) {

    publisher = vertx.eventBus().sender<String>("address")
               .setWriteQueueMaxSize(queueSize)

    vertx.setPeriodic(timeout) {
      if(publisher.writeQueueFull())
        return

      getAsyncMessages() { messages ->
         messages.forEach { publisher.wirte(it) }
      }
    }
  }
}

class VerticleB : AbstractVerticle() {
  
  val consumers: List<MyConsumers>
  
  override fun start(promise: Promise<Void>) {
    // Some initialization
    
    consumers = (1..count).map { createConsumer() }
  }

  fun createConsumer(): MessageConsumer<String> {
    val consumer = vertx.eventBus().consumer<String>("address")
    
    consumer.handler { message -> 
      consumer.pause()
      
      asyncProcess(message) { consumer.resume() }
    }
    return consumer
  }
}

当我仅使用单个VerticleBie实例部署应用程序时,DeploymentOptions.setInstances(1)一切正常。但是,当我将实例数设置为一个以上时,消费者会在处理少量初始消息后停止处理消息。

从日志中我可以看到每个消费者都消费了几条消息,然后停止消费。暂停和恢复日志也是成对的,即每次pause()调用都有一个resume()调用,因此asyncProcess()即使在出现错误的情况下也会调用回调。消费者也没有结束。

我在这里阅读了核心手册https://vertx.io/docs/vertx-core/java/但我没有看到任何可以解决此问题的内容。

4

0 回答 0