问题标签 [alpakka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Akka:如何在一个图形阶段提取一个值并在下一个阶段使用它
我正在使用 Alpakka 和 Akka 处理 CSV 文件。由于我有一堆 CSV 文件必须添加到同一个流中,因此我想添加一个包含来自文件名或请求的信息的字段。目前我有这样的事情:
它流式传输 ByteStrings(字段)的列表(行)序列。目标是这样的:
创建类似于以下的结构:
其中文件名是原始源中不存在的字段。
我认为在流中注入值看起来不太漂亮,而且最终我想确定在读取目录的流阶段传递给解析的文件名。
通过流阶段传递值以供以后重用的正确/规范方法是什么?
java - 如何在 Alpakka 中使用 Source.queue
我正在尝试为可以多次使用的 JMS 队列创建一个生产者;即,我不想在每次发送消息时都创建与队列的连接。我想要一个打开连接的演员,每次收到消息时,它都使用相同的流程。
演员初始化
演员 onMessage
scala - 使用 Slick 处理大表失败并出现 OutOfMemoryError
我正在使用 Akka Streams 和 Slick 查询一个大型 MySQL 表,但它以OutOfMemoryError
. 似乎 Slick 正在将所有结果加载到内存中(如果查询仅限于几行,它不会失败)。为什么会出现这种情况,解决方案是什么?
elasticsearch - ES 中带有 Alpakka 索引的 Akka Streams:索引名称仅在开始执行时评估
我已经使用 Akka Streams 和 Alpakka 编写了一些代码,这些代码从 Amazon SQS 读取并索引 Elasticsearch 中的事件。一切都很顺利,性能也很棒,但我对索引名称有疑问。我有这个代码:
问题是在运行流程几天后,索引名称没有改变。我想 Akka Streams 在幕后创建了一个融合的演员,并且index
获取索引名称的函数仅在执行开始时进行评估。
知道如何根据当前日期使用索引名称对 ES 中的事件进行索引吗?
scala - 如何定义 AmqpSource 订阅多个交易所?
现在我正在使用
我希望能够订阅多个交换。谁能帮我这个?
scala - Google Pub/Sub 订阅者未收到消息
首先,我对 Akka 没有经验,所以我自己调试它真的很糟糕。我尝试了此处的示例,并且发布消息有效(这意味着凭据有效),但没有发出任何消息。服务帐户被授予所有权限。
我的代码看起来像这样,基本上和例子中的一模一样:
我发现它akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic
从未被执行,这似乎是没有获取消息的原因。
scala - Alpakka S3Client 抛出 OptionVal$.contains NoSuchMethodError
我正在尝试使用Alpakka S3 连接器
当我实例化 S3Client 时,我得到了异常
它发生在
}
在 akka 流中(2.11 与 2.5.12)。变量 outPort 的类型为 OptionVal。
akka-actor_2.11:2.2.20中util包的OptionVal中确实不存在该方法。在akka 演员存储库中也是如此。但在这里它被记录为存在。
为什么这些方法不存在?我错过了依赖吗?我用了
java - 使用 Akka Kafka Streams 时不兼容的等式约束
我正在尝试按照Akka Kafka Streams文档使用 Akka Kafka Streams。这是我的代码:
这是 KafkaJacksonSerializer 的代码:
我不确定到底是什么问题。但是下面的代码没有显示任何错误:
有人可以帮我确定这里出了什么问题吗?
scala - Akka Stream Kafka:找不到关键“kafka-clients”的配置设置
我正在尝试使用Alpakka Kafka 连接器 (Akka Stream Kafka)创建一个简单的原型。
运行应用程序时出现以下错误:
我有以下代码./src/main/scala/App.scala
:
以下build.sbt
:
我使用sbt run
. 我还没有配置任何 uber/assembly jar。
我可能遗漏了一些明显的东西,但我看不到它......我怀疑 akka 依赖项存在一些问题。
更新
正如@terminally-chill 调用ProducerSettings(system, new StringSerializer, new StringSerializer)
(传递ActorSystem
而不是配置)所建议的那样解决问题。我只是不明白这是设计使然还是错误。
更新 2
我创建了一个已经修复的github 问题。现在文档更准确,并解释了创建ProducerSettings
/的正确方法ConsumerSettings
。
或者你可以通过ActorSystem
上面的解释。
akka - Akka 流的 S3 连接器问题 - Alpakka
我们正在使用 Alpakka s3 连接器从 VPC 内的本地系统连接到 s3 存储桶并出现如下错误,如果我们使用我们的传统 aws 客户端库,我们能够连接到 s3 并下载文件,我还附上了我们正在使用的示例代码用于 alpakka s3 连接器。这个错误是因为我必须在代码中设置一些 VPC 代理,我用它来处理我们的传统 aws s3 库,但我没有看到 alpakka 提供设置我的 VPC 代理的选项?
错误 - akka.stream.StreamTcpException: Tcp 命令 [Connect(bucket-name.s3.amazonaws.com:443,None,List(),Some(10 seconds),true)] 由于连接超时 Some(10 seconds) 而失败) 已到期
代码 -