0

我正在尝试使用 Alpakka 实现一个连接到 AMQP 代理的非常简单的服务。我只是希望它在将消息推送到给定的交换/主题时将其队列中的消息作为流使用。

在我的测试中一切似乎都运行良好,但是当我尝试启动我的服务时,我意识到我的流只消耗了我的消息一次然后退出。

基本上我使用的是 Alpakka 文档中的代码:

def consume()={
    val amqpSource = AmqpSource.committableSource(
      TemporaryQueueSourceSettings(connectionProvider, exchangeName)
        .withDeclaration(exchangeDeclaration)
        .withRoutingKey(topic),
      bufferSize = prefetchCount
    )

    val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))

    amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}

我试图安排consume()每秒执行一次,但我遇到OutOfMemoryException了问题。

是否有任何适当的方法可以使此代码作为无限循环运行?

4

1 回答 1

2

如果您想Source在失败或被取消时重新启动,请使用RestartSource.withBackoff.

于 2019-03-14T16:23:25.007 回答