问题标签 [alpakka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
264 浏览

akka - Akka Streams Kafka 错误处理 - 访问导致 Kafka Producer 出现问题的元素

例如,我看到这样的异常:

有没有办法了解更多关于导致此异常的 ProducerRecord 的信息?

SupervisorrecoverWith我只有关于异常的信息。我不能用 包装任何东西try catch,因为我使用的是内置的 Kafka Flow 或 Kafka Sink。可能我需要放弃这种集成并手动使用 Kafka 生产者,因为我看不到其他解决方案。

0 投票
1 回答
128 浏览

scala - 将案例类列表连接到 kafka 生产者?

我有以下案例类:

我正在尝试使用以下代码将这些案例类的列表连接到 kafka 中的生产者:

我正在使用 circe 将案例类转换为 json。但是我不断收到一个编译器错误说:

我不确定发生了什么事!

0 投票
2 回答
241 浏览

apache-kafka - How to create several partitions by Alpakka

I'm trying to create a simple producer which create a topic with some partitions provided by configuration.

According to Alpakka Producer Setting Doc any property from org.apache.kafka.clients.producer.ProducerConfig can be set in kafka-clients section. And, there is a num.partitions property as commented in Producer API Doc .

Thus, I added that property to my application.conf file as given below:

The producer application code is also given below:

But, this doesn't work. Producer creates a topic with a single partition instead of 3 partitions as I've set by configuration:

Finally, Kafkacat output is given below:

What is wrong? Is it possible to set properties from Kafka Producer API in kafka-clients section using Alpakka?

0 投票
1 回答
778 浏览

scala - Consumer committableSource 和 plainSource 有什么区别?

我正在尝试使用消费者库https://doc.akka.io/docs/alpakka-kafka/current/consumer.html方法committableSource如下:

这里的问题是,如何获取消费者收到的消息Kafka

使用以下代码片段有效:

整个代码片段:

还是我必须同时使用两者,一个用于提交,另一个用于消费。

0 投票
1 回答
54 浏览

akka - 带有临时队列的 Alpakka JMS 请求/响应 - 开箱即用?

考虑将一些基于 JMS 的遗留系统迁移到 Alpakka,代码中广泛使用的模式之一是使用临时队列 (JMSReplyTo) 的请求/响应。开箱即用的 Alpakka 是否可行?

0 投票
1 回答
314 浏览

akka - 在 Consumer API 中使用 createDrainingControl?

我正在阅读 Alpakka 中的 Consumer API for Kafka 文档。我遇到了这段代码。据我了解,偏移量是使用 msg.committableOffset() 提交的。那为什么我们需要 .toMat() 和 mapMaterializedValue()。我不能将它附加到 Sink.Ignore() 吗?

0 投票
3 回答
737 浏览

scala - 如何捕获 java.net.ConnectException:akka steram 上的连接被拒绝?

我有一个 kafka 消费者,如下所示:

kafka 服务器未激活,我只想终止消费者。它总是尝试连接并显示以下消息:

它还说:

如何赶上ConnectException流中的内容并阻止消费者尝试连接 kafka。

代码托管在这里https://gitlab.com/akka-samples/kafkaconsumer

0 投票
1 回答
461 浏览

scala - Google PubSub 可能存在编码问题

从 Alpakka PubSub 库运行订阅源时,我收到了可能的编码数据。

我从 PubSub 控制台发布消息。我希望我的测试消息会出现,但是在发布时test我会收到dGVzdA==. 这是预期的结果吗?我在导入私钥时遇到问题,这可能是它的结果吗?

消费者热切地与 Guice 绑定在一起。

0 投票
1 回答
82 浏览

akka - createDrainingControl 的目的是什么?

我正在阅读 Alpakka 的文档。在阅读 Kafka 消费者 API 时,我遇到了 createDrainingControl(),我想知道这个函数有什么用?我知道这是用来排空和停止流的,但是如果我们不断地消耗来自 Kafka MQ 的消息,为什么我们需要停止流呢?

0 投票
1 回答
32 浏览

akka-stream - 将来自不同主题的消息保存到 Alpakka 中的不同文件

我试图弄清楚如何将消息从订阅多个主题的 Kafka 消费者传递到基于主题的处理阶段(例如,将它们保存到特定文件或数据库等)。

有一个Consumer.externalCommittableSource,但它需要手动选择分区,这是我想避免的。

一般来说,根据流元素的某些分组属性的值动态创建流和汇的正确方法是什么?