问题标签 [alpakka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
537 浏览

akka-stream - Alpakka KinesisSink:无法将消息推送到流

我正在尝试使用alpakka kinesis 连接器将消息发送到 Kinesis Stream,但我没有成功。我尝试了下面的代码,但我的流中没有任何内容。

  • 使用 aSink.foreach(println)而不是每 1 秒KinesisSink打印PutRecordsRequestEntry一次 => EXPECTED
  • 使用KinesisSink,条目仅生成一次。

我究竟做错了什么 ?

我正在用 a 检查我的流KinesisSource并且正在阅读(用另一个流测试)

此外,AWS Kinesis 的监控仪表板不显示任何 PUT 请求。

注1:我尝试启用alpakka的调试日志但没有效果

在我的logback.xml+ 根级调试中

0 投票
0 回答
127 浏览

scala - Scala alpaakka s3 与 DO Spaces 分段上传结果 403 连接

我正在尝试将 akka-http 文件上传到 Digital Ocean Spaces。我遇到了分段上传的问题。我使用 alpaakka 将文件直接流式传输到 s3。但是我在上传时收到 403 错误 SignatureDoesNotMatch。但是,如果我尝试使用简单的一部分上传相同的文件,则一切正常。

分段上传成功发起请求并且签名有效,但是当它尝试实际上传部分时,我得到 403。

有人用 DO 试过吗?我更改了不同的设置,我如何设置客户端(使用代理,使用端点),但结果仍然相同。

谢谢您的帮助。

0 投票
1 回答
683 浏览

scala - 使用 Slick (JDBC) Connector for Alpakka 时使用分页 SQL 语句是否有意义

我目前想知道 Alpakka 的 Slick (JDBC) 连接器是如何工作的——而且我真的无法使用 Docs 找到答案。

考虑一个用例,我想处理从数据库中选择的大量记录。我可以简单地SELECT * FROM [TABLE]在单个流中使用 a 吗,或者为每个页面(一个接一个)启动多个流是否有意义,例如SELECT * FROM [TABLE] LIMIT 0,1000.

我希望/认为 Slick Connector Alpakka 的响应式方式只负责在流需要时从数据库中获取记录,以便我可以使用SELECT * FROM [TABLE]...

谁能给我一些见解或一些好的文档来通读?

0 投票
1 回答
1091 浏览

scala - Akka Stream - 如何从多个 SQS 源流式传输

这是Akka Stream - Select Sink based on Element in Flow的后续文章。

假设我有多个要从中流式传输的 SQS 队列。我正在使用Alpakka的AWS SQS 连接器Source来创建.

现在,我想combine合并它们的来源。但是,Source.combine方法不支持将列表作为参数传递,而仅支持可变参数。

当然,我可以手指输入所有源参数。但是,如果我有 10 个源队列,参数会变得很长。

有没有办法从源列表中组合源?

[补充]

正如Ramon J Romero y Vigil所指出的,保持流“薄薄的一层”是一种更好的做法。然而,在这种特殊情况下,我使用 singlesqsClient进行所有SqsSource初始化。

0 投票
1 回答
752 浏览

xml - Alpakka:如何将 xml 反序列化为对象 - 反序列化器有更简洁的模式吗?

我有复杂的 XML 数据(它可能包含大量数据并且可能超过 15GB),这些数据具有复杂的性质和深层结构。我们需要对庞大的 XML 进行流处理。使用新的Alpakka库是我们的首选,因为它是一个很有前途的解决方案。

scala-xml 序列化其他 Scala 库中存在过时的线程,但我们需要将大量 XML 作为事件流进行处理。

为了简化事情,让我们假设我们有一个PurchaseOrder(XML 来自这个页面)。

我正在尝试从 XML流式传输所有Item并反序列化它们。请注意,相同的标签可能会出现在不同的级别上。此外, Item中的元素/属性可以以任意顺序出现。我看到的一种方法(主要基于Alpakka 的 XmlProcessingTest - 有人可以提出更好的参考吗?),可能如下所示:

该示例发布在Scastie

这种方法需要为每个标签(val 句柄:PartialFunction)提供大量处理程序,这可能容易出错且过于脆弱。

我想知道如何以更简洁的方式处理ParseEvent并将它们组合成所需的Item对象。任何建议如何避免样板代码?反序列化器有更简洁的模式吗?

0 投票
0 回答
185 浏览

apache-kafka - 您如何处理 akka 流中的下游故障?

我正在运行从 Kafka 读取的 akka 流,并且我想在文件序列化成功时将消息提交回 Kafka。但我不知道如何通知上游阶段下游失败。

现在,我用FanOutShape2[ConsumerMessage.CommittableMessage[Array[Byte], Array[Byte]], ConsumerRecord[Array[Byte], Array[Byte]], ConsumerMessage.CommittableOffsetBatch].

当下游接收器完成消耗所有每小时分组时,我想推送到偏移提交出口ConsumerRecords,但是当文件无法正确完成时,我想保留这些提交。

因此,鉴于这个简化的场景,假设我有以下流

这将以IOResult(0,Failure(java.nio.file.AccessDeniedException: /file-in-root-will-fail.txt)).

我如何通知上游阶段发生了这种情况?

0 投票
1 回答
399 浏览

mongodb - Alpakka MongoDB - 在 MongoSource 中指定类型

我目前正在使用 Akka Streams 和Alpakka MongoDB connector

是否可以指定类型MongoSource

我想做这样的事情:

但我收到以下错误:

我找不到关于这部分的正确文档。

0 投票
1 回答
4218 浏览

scala - 计算 Akka Streams 中的元素数量

我正在尝试通过 Alpakka将一个SourceScala 实体转换为一个Source,并计算初始流中的元素数量。您能否建议计算元素并将结果保留为的最佳方法:ByteStringCsvFormattinginitialSourceByteString Source

0 投票
1 回答
103 浏览

mongodb - Alpakka MongoDB - 覆盖 MongoSource 实现

我有与Alpakka MongoDB 类似的问题 - 在 MongoSource 中指定类型

所以我MongoSource在解决方案中实现了我自己的并使用了它:

但是ObservableToPublisherprivate class我收到以下错误:

如何解决这个问题?

0 投票
0 回答
57 浏览

alpakka - 为什么 Alpakka 不使用消息模式?

如果 Alpakka 旨在成为 Apache Camel 的替代品,那么为什么它不像所有其他集成项目那样发送某种形式的标准化消息对象,其中包含有效负载、标头、属性、MIME 类型等?(例如 Mule、Spring 集成)。似乎只是在各个流程步骤之间移动了一个裸有效负载。