0

我正在使用这个库:https ://doc.akka.io/docs/alpakka/current/sqs.html来使用 SQS。

我正在尝试使用它创建 SQS 长轮询,他们提供了一个用于从 SQS 读取消息的片段:

final CompletionStage<List<Message>> cs =
    SqsSource.create(
            queueUrl,
            SqsSourceSettings.create()
                .withWaitTime(Duration.ofSeconds(1)),
            sqsClient)
        .runWith(Sink.seq(), materializer);

我之前在 Scala 中使用过来自 Akka 的 RestartSource,但在 Java 中,我无法使这个轮询无限期。它在几分钟后停止。什么是保持轮询器存活的好方法?Java中还有其他选择吗?

4

1 回答 1

0

您可以通过使用 RestartSource 包装源来实现此目的:https://doc.akka.io/docs/akka/current/stream/stream-error.html#delayed-restarts-with-a-backoff-operator 在此,您只需在 maxRestart 上传递负值。文档对此非常清楚,我只是找不到 SQS 和本节。

于 2019-01-29T18:13:12.903 回答