我正在尝试使用 Alpakka 实现一个连接到 AMQP 代理的非常简单的服务。我只是希望它在将消息推送到给定的交换/主题时将其队列中的消息作为流使用。
在我的测试中一切似乎都运行良好,但是当我尝试启动我的服务时,我意识到我的流只消耗了我的消息一次然后退出。
基本上我使用的是 Alpakka 文档中的代码:
def consume()={
val amqpSource = AmqpSource.committableSource(
TemporaryQueueSourceSettings(connectionProvider, exchangeName)
.withDeclaration(exchangeDeclaration)
.withRoutingKey(topic),
bufferSize = prefetchCount
)
val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))
amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}
我试图安排consume()
每秒执行一次,但我遇到OutOfMemoryException
了问题。
是否有任何适当的方法可以使此代码作为无限循环运行?