我在使用 Alpakka AMQP 连接器和 Akka Streams 时遇到了一个非常奇怪的问题。
当我的 RabbitMQ 消息代理重新启动时,源似乎重新启动正常。但是,一旦重新启动,流永远不会完成,并且消息会丢失在流中更远的分区中。当我启动 AMQP 服务器时,我的 Akka 应用程序运行良好,但反过来一切都搞砸了。
这是我初始化我的方式AMQPSource
:
val amqpMessageSource = builder.add {
val amqpSource = AmqpSource(
NamedQueueSourceSettings(connectionDetails, amqpInMessageQueue).withDeclarations(queueDeclaration),
bufferSize = 10
).map { message =>
fromIncomingMessage(message)
}.initialDelay(5.seconds)
amqpSource.recoverWithRetries(-1, { case _ => amqpSource }) // Retry every 5 seconds an infinity of times
}
我试图删除发生问题的分区,将流直接发送到与我的示例相关的流,这甚至更奇怪:在这种情况下,AMQP 客户端甚至不再从 RabbitMQ 读取消息。
我显然在这里遗漏了一些东西,但我尝试了很多不同的东西,但根本没有解决我的问题。