问题标签 [alpakka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
227 浏览

scala - Akka:如何在一个图形阶段提取一个值并在下一个阶段使用它

我正在使用 Alpakka 和 Akka 处理 CSV 文件。由于我有一堆 CSV 文件必须添加到同一个流中,因此我想添加一个包含来自文件名或请求的信息的字段。目前我有这样的事情:

它流式传输 ByteStrings(字段)的列表(行)序列。目标是这样的:

创建类似于以下的结构:

其中文件名是原始源中不存在的字段。

我认为在流中注入值看起来不太漂亮,而且最终我想确定在读取目录的流阶段传递给解析的文件名。

通过流阶段传递值以供以后重用的正确/规范方法是什么?

0 投票
0 回答
145 浏览

java - 如何在 Alpakka 中使用 Source.queue

我正在尝试为可以多次使用的 JMS 队列创建一个生产者;即,我不想在每次发送消息时都创建与队列的连接。我想要一个打开连接的演员,每次收到消息时,它都使用相同的流程。

演员初始化

演员 onMessage

0 投票
1 回答
913 浏览

scala - 使用 Slick 处理大表失败并出现 OutOfMemoryError

我正在使用 Akka Streams 和 Slick 查询一个大型 MySQL 表,但它以OutOfMemoryError. 似乎 Slick 正在将所有结果加载到内存中(如果查询仅限于几行,它不会失败)。为什么会出现这种情况,解决方案是什么?

0 投票
1 回答
172 浏览

elasticsearch - ES 中带有 Alpakka 索引的 Akka Streams:索引名称仅在开始执行时评估

我已经使用 Akka Streams 和 Alpakka 编写了一些代码,这些代码从 Amazon SQS 读取并索引 Elasticsearch 中的事件。一切都很顺利,性能也很棒,但我对索引名称有疑问。我有这个代码:

问题是在运行流程几天后,索引名称没有改变。我想 Akka Streams 在幕后创建了一个融合的演员,并且index获取索引名称的函数仅在执行开始时进行评估。

知道如何根据当前日期使用索引名称对 ES 中的事件进行索引吗?

0 投票
1 回答
97 浏览

scala - 如何定义 AmqpSource 订阅多个交易所?

现在我正在使用

我希望能够订阅多个交换。谁能帮我这个?

0 投票
1 回答
578 浏览

scala - Google Pub/Sub 订阅者未收到消息

首先,我对 Akka 没有经验,所以我自己调试它真的很糟糕。我尝试了此处的示例,并且发布消息有效(这意味着凭据有效),但没有发出任何消息。服务帐户被授予所有权限。

我的代码看起来像这样,基本上和例子中的一模一样:

我发现它akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic从未被执行,这似乎是没有获取消息的原因。

0 投票
1 回答
60 浏览

scala - Alpakka S3Client 抛出 OptionVal$.contains NoSuchMethodError

我正在尝试使用Alpakka S3 连接器

当我实例化 S3Client 时,我得到了异常

它发生在

}

在 akka 流中(2.11 与 2.5.12)。变量 outPort 的类型为 OptionVal。

akka-actor_2.11:2.2.20中util包的OptionVal中确实不存在该方法。在akka 演员存储库中也是如此。但在这里它被记录为存在。

为什么这些方法不存在?我错过了依赖吗?我用了

0 投票
1 回答
447 浏览

java - 使用 Akka Kafka Streams 时不兼容的等式约束

我正在尝试按照Akka Kafka Streams文档使用 Akka Kafka Streams。这是我的代码:

但是上面的代码在 runwith() 处显示编译器错误: 在此处输入图像描述

这是 KafkaJacksonSerializer 的代码:

我不确定到底是什么问题。但是下面的代码没有显示任何错误:

有人可以帮我确定这里出了什么问题吗?

0 投票
4 回答
1785 浏览

scala - Akka Stream Kafka:找不到关键“kafka-clients”的配置设置

我正在尝试使用Alpakka Kafka 连接器 (Akka Stream Kafka)创建一个简单的原型。

运行应用程序时出现以下错误:

我有以下代码./src/main/scala/App.scala

以下build.sbt

我使用sbt run. 我还没有配置任何 uber/assembly jar。

我可能遗漏了一些明显的东西,但我看不到它......我怀疑 akka 依赖项存在一些问题。

更新

正如@terminally-chill 调用ProducerSettings(system, new StringSerializer, new StringSerializer)(传递ActorSystem而不是配置)所建议的那样解决问题。我只是不明白这是设计使然还是错误。

更新 2

我创建了一个已经修复的github 问题。现在文档更准确,并解释了创建ProducerSettings/的正确方法ConsumerSettings

或者你可以通过ActorSystem上面的解释。

0 投票
1 回答
653 浏览

akka - Akka 流的 S3 连接器问题 - Alpakka

我们正在使用 Alpakka s3 连接器从 VPC 内的本地系统连接到 s3 存储桶并出现如下错误,如果我们使用我们的传统 aws 客户端库,我们能够连接到 s3 并下载文件,我还附上了我们正在使用的示例代码用于 alpakka s3 连接器。这个错误是因为我必须在代码中设置一些 VPC 代理,我用它来处理我们的传统 aws s3 库,但我没有看到 alpakka 提供设置我的 VPC 代理的选项?

错误 - akka.stream.StreamTcpException: Tcp 命令 [Connect(bucket-name.s3.amazonaws.com:443,None,List(),Some(10 seconds),true)] 由于连接超时 Some(10 seconds) 而失败) 已到期

代码 -