问题标签 [reactive-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 反应式 Kafka:为不存在的主题创建流时没有错误
我们最近切换到 Reactive Kafka。如果在为主题创建流时主题存在,kafka 似乎不会抛出任何错误。为什么会这样?
我知道一个主题的存在可以用AdminUtils.topicExists(zkClient, topicName)
. 有没有办法强制响应式 kafka 在创建流之前检查主题是否存在?
scala - 添加用于写入 Kafka 订阅者的流
我需要构建以下图表:
但是试图在 Reactive Kafka 中实现它让我陷入了困境。这似乎是错误的,因为这让我觉得这是一个相对常见的用例:我想在 Kafka 主题之间移动数据,同时保证至少一次交付语义。
现在并行写完全没问题
此代码有效,因为writeToTopic2
可以使用 实现ReactiveKafka#publish(..)
,它返回一个Sink
. 但是当我的应用程序崩溃时,我会失去 ALOS 保证和数据。
所以我真正需要的是编写一个写入 Kafka 主题的 Flow。我尝试过使用Flow.fromSinkAndSource(..)
自定义GraphStage
,但遇到了流经数据的类型问题;例如,提交的内容commitOffsetForTopic1
不应包含在 中writeToTopic2
,这意味着我必须一直保留一个包含两条数据的包装器对象。但这与writeToTopic2
接受ProducerMessage[K,V]
. 我最近解决这个问题的尝试遇到了反应式 kafka 库中的私有和最终类(扩展/包装/替换底层 SubscriptionActor)。
我真的不想维护一个分叉来实现这一点。我错过了什么?为什么这么难?我是否以某种方式试图构建一个病态图节点,或者这个用例是一个疏忽……还是我在我一直在挖掘的文档和源代码中以某种方式遗漏了一些完全明显的东西?
当前版本是 0.10.1。我可以根据要求添加更多关于我的许多尝试的详细信息。
akka-stream - 将生产者流连接到图
我是使用 akka 流 kafka (和一般的 akka 流)的新手。我正在尝试构建一个图表,以便将消息发布到不同的主题。如何将生产者连接为流以提交已处理的消息?我尝试使用 Producer.flow 但我无法获得commitScaladsl
akka - Async 和 mapAsync - 在 akka 流中实现并行性 - 反应式 kafka
我目前正在使用 akka 流并构建一个反应式应用程序。到目前为止,我的理解async
是设定边界时,
每个阶段(流程)都在一个演员中运行。真的吗?
调用mapAsync
类似于应用async
到流?
理想情况下,我会有一堆图表,并希望每个图表在一个演员中并行运行,这有助于扩展。
附加信息:
我正在用反应式 kafka 实现一些东西,想知道 kafka 主题消耗是如何负载平衡的。即:如果我有 20 个分区,如果我正在使用plainPartitionSource
并构建一个图形,它会创建那么多消费者参与者吗?
如果将有 20 个消费者参与者(等于它正在消费的特定主题的 20 个分区),那么理想情况下会有 20 个图。将图表具体化,我希望它在不同的参与者中运行。
简而言之,我想要完全并行,这取决于分区的数量,从消费到处理它。
希望我的问题很清楚。
编辑:
scala - 如何确保不断生成 Avro 模式并避免“为 x 创建太多模式对象”异常?
我在使用响应式 kafka和avro4s生成 Avro 消息时遇到可重现的错误。一旦达到identityMapCapacity
客户端 ( CachedSchemaRegistryClient
) 的值,序列化就会失败
这是出乎意料的,因为所有消息都应该具有相同的模式——它们是相同案例类的序列化。
序列化器是 a KafkaAvroSerializer
,用 a 实例化new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)
生成GenericRecord
:
架构是在代码深处创建的(io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchema
,由 调用io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl
),我对其没有影响,所以我不知道如何修复泄漏。在我看来,这两个融合项目不能很好地协同工作。
我目前的两种解决方法是:
- 不使用模式注册表——显然不是一个长期的解决方案
- 创建
SchemaRegistryClient
不依赖对象身份的自定义 - 可行但我想避免创建更多问题而不是重新实现
有没有办法根据消息/记录类型生成或缓存一致的模式并将其与我的设置一起使用?
scala - 如何将akka流kafka(reactive-kafka)集成到akka http应用程序中?
我有一个基本的 scala akka http CRUD 应用程序。请参阅下面的相关课程。
例如,每当创建/更新实体时,我只想将实体 ID 和一些数据(作为 json)写入 Kafka 主题。
我正在查看http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但对 scala 和 akka 是新手,不确定如何将其集成到我的应用程序中?
例如,从上面的文档中,这是生产者写信给 kafka 的示例,所以我认为我需要类似的东西,但是我的应用程序中的下落应该去哪里?创建用户后,我可以在我的服务的 create 方法中添加另一个地图调用吗?
非常感谢!
或者我是否需要在我的 Server.scala 的 bindAndHandle() 方法中执行类似https://github.com/hseeberger/accessus的示例?
WebServer.scala
路由器.scala
服务.scala
回购协议
akka-stream - 并行使用 Akka Streams Kafka Producer
我想并行化对 kafka 的写入,即有多个生产者向 kafka 发送数据,尽管是在 akka 流中。在其他情况下,我的流将有几个来自源的初始阶段,然后在发送数据时,我希望有大约 16 个工作人员同时发送数据。
我想知道是否需要将 Akka Streams Kafka 嵌入到 akka-stream Graph DSL 中并为此使用平衡器,或者是否有更简单的解决方案。此外,简单地说,如果有人做了类似的事情,那会很棒。
scala - 找到 java.util.concurrent.Future 需要 scala.concurrent.Future
相关:java.util.concurrent.Future 的 scala.concurrent.Future 包装器
这来自我的另一个问题:
如何将akka流kafka(reactive-kafka)集成到akka http应用程序中?
我有一个 AKKA HTTP 应用程序,我想在我的路由中的 onComplete 函数中向 Kafka 发送消息/ProducerRecord,如下所示:
但是,onComplete(producer send producerRecord)正在生成以下类型不匹配错误:
[错误] 发现:Future[org.apache.kafka.clients.producer.RecordMetadata](在 java.util.concurrent 中)[错误] 需要:Future[org.apache.kafka.clients.producer.RecordMetadata](在 scala.并发)[错误] onCompleteRecordMetadata { _ =>
有没有办法解决这个问题,也许通过使用 Producer 作为接收器(http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink)而不是java producer.send函数?
scala - Akka Streams Reactive Kafka - OutOfMemoryError under high load
I am running an Akka Streams Reactive Kafka application which should be functional under heavy load. After running the application for around 10 minutes, the application goes down with an OutOfMemoryError
. I tried to debug the heap dump and found that akka.dispatch.Dispatcher
is taking ~5GB of memory. Below are my config files.
Akka version: 2.4.18
Reactive Kafka version: 2.4.18
1.application.conf
:
2.build.sbt
:
3.Source
and Sink
actors:
In this case actorA
, actorB
, and actorC
are doing some business logic processing and database interaction. Is there anything I am missing in handling the Akka Reactive Kafka consumers such as commit, error, or throttling configuration? Because looking into the heap dump, I could guess that the messages are piling up.
scala - 等效于 kafkaConsumer.partitionsFor 使用响应式 kafka(akka kafka 流)
我正在使用响应式卡夫卡(akka kafka 流):
https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html
使用reactive-kafka的以下代码的等价物是什么?