问题标签 [alpakka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
akka - Akka Streams Kafka 错误处理 - 访问导致 Kafka Producer 出现问题的元素
例如,我看到这样的异常:
有没有办法了解更多关于导致此异常的 ProducerRecord 的信息?
在Supervisor
或recoverWith
我只有关于异常的信息。我不能用 包装任何东西try catch
,因为我使用的是内置的 Kafka Flow 或 Kafka Sink。可能我需要放弃这种集成并手动使用 Kafka 生产者,因为我看不到其他解决方案。
scala - 将案例类列表连接到 kafka 生产者?
我有以下案例类:
我正在尝试使用以下代码将这些案例类的列表连接到 kafka 中的生产者:
我正在使用 circe 将案例类转换为 json。但是我不断收到一个编译器错误说:
我不确定发生了什么事!
apache-kafka - How to create several partitions by Alpakka
I'm trying to create a simple producer which create a topic with some partitions provided by configuration.
According to Alpakka Producer Setting Doc any property from org.apache.kafka.clients.producer.ProducerConfig
can be set in kafka-clients
section. And, there is a num.partitions
property as commented in Producer API Doc .
Thus, I added that property to my application.conf
file as given below:
The producer application code is also given below:
But, this doesn't work. Producer creates a topic with a single partition instead of 3 partitions as I've set by configuration:
Finally, Kafkacat output is given below:
What is wrong? Is it possible to set properties from Kafka Producer API in kafka-clients
section using Alpakka?
scala - Consumer committableSource 和 plainSource 有什么区别?
我正在尝试使用消费者库https://doc.akka.io/docs/alpakka-kafka/current/consumer.html方法committableSource
如下:
这里的问题是,如何获取消费者收到的消息Kafka
?
使用以下代码片段有效:
整个代码片段:
还是我必须同时使用两者,一个用于提交,另一个用于消费。
akka - 带有临时队列的 Alpakka JMS 请求/响应 - 开箱即用?
考虑将一些基于 JMS 的遗留系统迁移到 Alpakka,代码中广泛使用的模式之一是使用临时队列 (JMSReplyTo) 的请求/响应。开箱即用的 Alpakka 是否可行?
akka - 在 Consumer API 中使用 createDrainingControl?
我正在阅读 Alpakka 中的 Consumer API for Kafka 文档。我遇到了这段代码。据我了解,偏移量是使用 msg.committableOffset() 提交的。那为什么我们需要 .toMat() 和 mapMaterializedValue()。我不能将它附加到 Sink.Ignore() 吗?
scala - 如何捕获 java.net.ConnectException:akka steram 上的连接被拒绝?
我有一个 kafka 消费者,如下所示:
kafka 服务器未激活,我只想终止消费者。它总是尝试连接并显示以下消息:
它还说:
如何赶上ConnectException
流中的内容并阻止消费者尝试连接 kafka。
scala - Google PubSub 可能存在编码问题
从 Alpakka PubSub 库运行订阅源时,我收到了可能的编码数据。
我从 PubSub 控制台发布消息。我希望我的测试消息会出现,但是在发布时test
我会收到dGVzdA==
. 这是预期的结果吗?我在导入私钥时遇到问题,这可能是它的结果吗?
消费者热切地与 Guice 绑定在一起。
akka - createDrainingControl 的目的是什么?
我正在阅读 Alpakka 的文档。在阅读 Kafka 消费者 API 时,我遇到了 createDrainingControl(),我想知道这个函数有什么用?我知道这是用来排空和停止流的,但是如果我们不断地消耗来自 Kafka MQ 的消息,为什么我们需要停止流呢?
akka-stream - 将来自不同主题的消息保存到 Alpakka 中的不同文件
我试图弄清楚如何将消息从订阅多个主题的 Kafka 消费者传递到基于主题的处理阶段(例如,将它们保存到特定文件或数据库等)。
有一个Consumer.externalCommittableSource
,但它需要手动选择分区,这是我想避免的。
一般来说,根据流元素的某些分组属性的值动态创建流和汇的正确方法是什么?