4

我在使用 Alpakka AMQP 连接器和 Akka Streams 时遇到了一个非常奇怪的问题。

当我的 RabbitMQ 消息代理重新启动时,源似乎重新启动正常。但是,一旦重新启动,流永远不会完成,并且消息会丢失在流中更远的分区中。当我启动 AMQP 服务器时,我的 Akka 应用程序运行良好,但反过来一切都搞砸了。

这是我初始化我的方式AMQPSource

val amqpMessageSource = builder.add {
  val amqpSource = AmqpSource(
    NamedQueueSourceSettings(connectionDetails, amqpInMessageQueue).withDeclarations(queueDeclaration),
    bufferSize = 10
  ).map { message =>
    fromIncomingMessage(message)
  }.initialDelay(5.seconds)
  amqpSource.recoverWithRetries(-1, { case _ => amqpSource }) // Retry every 5 seconds an infinity of times
}

我试图删除发生问题的分区,将流直接发送到与我的示例相关的流,这甚至更奇怪:在这种情况下,AMQP 客户端甚至不再从 RabbitMQ 读取消息。

我显然在这里遗漏了一些东西,但我尝试了很多不同的东西,但根本没有解决我的问题。

4

0 回答 0