问题标签 [alpakka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
843 浏览

scala - 使用流的内容更改源中的具体化值

Alpakka 提供了一种访问数十种不同数据源的好方法。面向文件的源(例如 HDFS 和 FTP 源)以Source[ByteString, Future[IOResult]. 但是,通过 Akka HTTP 的 HTTP 请求是作为Source[ByteString, NotUsed]. 在我的用例中,我想从 HTTP 源中检索内容,Source[ByteString, Future[IOResult]这样我就可以构建一个统一的资源获取器,它适用于多种方案(在本例中为 hdfs、文件、ftp 和 S3)。

特别是,我想将Source[ByteString, NotUsed]源 转换为Source[ByteString, Future[IOResult]能够从传入字节流计算 IOResult 的位置。有很多类似的方法flatMapConcatviaMat但似乎没有一个能够从输入流中提取细节(例如读取的字节数)或IOResult正确初始化结构。理想情况下,我正在寻找一种具有以下签名的方法,该方法将在流进入时更新 IOResult。

0 投票
1 回答
267 浏览

rabbitmq - 当背压启动时,在rabbitmq中获取生产者的失败回调

我想使用一些回调 api 为我的 rabbitmq 生产者找出失败的消息。我已经用 [{rabbit, [{vm_memory_high_watermark, 0.001}]}] 配置了 rabbitmq。并尝试推送大量消息,但所有消息都被接受,并且稍后会出现 TimeoutException 并且消息没有发送到 Queue enter code here,请告诉我如何捕获它。

发送消息代码:

0 投票
1 回答
186 浏览

java - 使用 Alpakka Java 进行无限长轮询

我正在使用这个库:https ://doc.akka.io/docs/alpakka/current/sqs.html来使用 SQS。

我正在尝试使用它创建 SQS 长轮询,他们提供了一个用于从 SQS 读取消息的片段:

我之前在 Scala 中使用过来自 Akka 的 RestartSource,但在 Java 中,我无法使这个轮询无限期。它在几分钟后停止。什么是保持轮询器存活的好方法?Java中还有其他选择吗?

0 投票
2 回答
976 浏览

scala - 如何使用 Embedded-Kafka-lib 测试 Kafka Consumer,完全使用“withRunningKafka”?

我应该测试我的代码,以便通过嵌入式“withRunningKafka”使用来自 kafka-server 的所有消息,如下所示:https ://github.com/manub/scalatest-embedded-kafka

  1. 我试图通过创建的嵌入式生产者向主题发送消息。
  2. 而且我尝试通过项目中的代码使用生成的消息(由嵌入式生产者创建)。

“使用自定义生产者和消费者进行测试”应该{

问题就在这里:'ok' 并没有给出'Done' 的结果。一般来说,我测试消费者的逻辑是否正确?

0 投票
2 回答
702 浏览

elasticsearch - 使用 Akka Streams,我如何知道源何时完成?

我有一个 Alpakka Elasticsearch Sink,我在请求之间保留了它。当我收到请求时,我Source从 HTTP 请求创建一个并将其转换Source为 ElasticsearchWriteMessage的一个,然后使用mySource.runWith(theElasticseachSink).

  1. 来源完成后如何收到通知?似乎没有什么有用的东西被实现。
  2. 源的完成是否会传递给接收器,这意味着我每次都必须创建一个新的?
  3. 如果以上是肯定的,会在Flow.fromSourceAndSink帮助下以某种方式将它们解耦吗?

我的目标是知道 HTTP 下载何时完成(包括via它所经过的 s)并能够重用接收器。

0 投票
1 回答
136 浏览

amazon-kinesis - 将凭证添加到 AmazonKinesisAsyncClientBuilder

在 StackOverflow ( link ) 的其他地方,对 AWS S3 的访问是这样完成的:

我想将此模式用于AmazonKinesisAsyncClientBuilder,但没有任何方法可以设置区域或凭据。

即这不能编译:

我正在从自定义文件中引入信誉(AWS 中的正常情况);AmazonKinesisAsyncClientBuilder仅适用于默认值吗?

使用适用于 Java 的 AWS 开发工具包 2.5.1

计划通过AlpakkaKinesis阅读 Kinesis ,但这可能不相关。

0 投票
1 回答
1848 浏览

scala - 以编程方式停止 Alpakka Kafka 流的正确方法

我们正在尝试使用 Akka Streams 和 Alpakka Kafka 来消费服务中的事件流。为了处理事件处理错误,我们使用 Kafka 自动提交和多个队列。例如,如果我们有user_created想要从产品服务中消费的主题,我们也会创建user_created_for_products_faileduser_created_for_products_dead_letter。这两个额外的主题与特定的 Kafka 消费者组耦合。如果一个事件处理失败,它会进入失败队列,我们​​会在五分钟内再次尝试消费——如果再次失败,它就会进入死信。

在部署时,我们希望确保我们不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但所有这些“飞行”的事件都尚未处理。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。

阅读文档后,我们已经看到了该KillSwitch功能。我们在其中看到的问题是该shutdown方法按我们的预期Unit返回。Future[Unit]我们不确定使用它是否会丢失事件,因为在测试中它看起来太快而无法正常工作。

ActorSystem作为一种解决方法,我们为每个流创建一个并使用该terminate方法(返回 a Future[Terminate])。这个解决方案的问题是,我们认为创建ActorSystem每个流不会很好地扩展,并且terminate需要很长时间才能解决(在测试中,关闭最多需要一分钟)。

你遇到过这样的问题吗?是否有更快的方法(与 相比ActorSystem.terminate)来停止流并确保Source已处理所有已发出的事件?

0 投票
1 回答
180 浏览

akka-stream - 使用 Alpakka 的无限 AMQP 消费者

我正在尝试使用 Alpakka 实现一个连接到 AMQP 代理的非常简单的服务。我只是希望它在将消息推送到给定的交换/主题时将其队列中的消息作为流使用。

在我的测试中一切似乎都运行良好,但是当我尝试启动我的服务时,我意识到我的流只消耗了我的消息一次然后退出。

基本上我使用的是 Alpakka 文档中的代码:

我试图安排consume()每秒执行一次,但我遇到OutOfMemoryException了问题。

是否有任何适当的方法可以使此代码作为无限循环运行?

0 投票
0 回答
294 浏览

apache-kafka - Akka-stream-kafka 消费者组中增加消费者触发重新平衡,由于撤销分区而导致 CommitFailedException

我认为这个问题与#539有关,但我不知道这是一个错误,还是用户应该自己处理。

所以我有一个消费者组,每当我增加该组中的消费者数量时,撤销分区就会导致以下错误:

当我减少消费者数量时,这不会发生。我的意思是到目前为止我还没有观察到这一点。我认为这是因为分区不会在缩减时撤销。剩下的消费者只是得到新的分区。

请注意,我做群组消息并提交批量事情。

这是我的代码的样子

我的决策者只是执行以下操作:

因此,根据我对 #539 的阅读,我了解到我有许多机上消息要提交,但由于撤销,我不能提交。也就是说,当消费者数量增加时,会发生一些涉及撤销的再平衡。

我的服务至少一次,所以我不介意其他消费者是否重新处理这些消息。我们没有最多一个交付约束。

我的问题是,直到图书馆本机处理这些情况,每当撤销发生或更好时,我怎么能继续提交它们,只是丢弃它们,所以被分配了它们所属分区的消费者将重新处理它们。

有什么建议吗?我检查了 BalanceListener,但我不确定如何在这种情况下使用它。

注意我的超时配置

0 投票
2 回答
467 浏览

scala - Sqs Akka Stream 内存不足

下面的代码在运行 15 分钟内在 EC2 实例上引发 OOO(java config xms 1024 xmx2G),但在 intellij 上运行时不会引发任何错误。

我用 1.0-M3 和 1.0-RC1 都试过了。有解决办法吗?

使用 jhat 的前 5 个对象创建直方图 -

我也在这里发现了类似的问题 - https://github.com/akka/alpakka/issues/1588

我想知道是否有一些替代方案可以解决这个问题。