问题标签 [akka-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 使用 reactive-kafka 有条件地处理消息
我一直在尝试使用 reactive-kafka,但我在条件处理方面遇到了问题,对此我没有找到令人满意的答案。
基本上我正在尝试使用一个包含大量消息(每天大约 100 亿条消息)的 kafka 主题,并且根据消息的某些属性只处理其中的一些消息(每天几千条),然后将我的消息的处理版本推送到另一个主题,我正在努力正确地做到这一点。
我的第一次尝试是这样的:
这种方法的问题在于,我只有在阅读我能够处理的消息时才会提交,这显然不是很酷,因为如果我必须停止并重新启动我的程序,那么我必须重新阅读一堆无用的消息,并且因为它们太多了,我不能那样做。
然后,我尝试通过以下方式使用 GraphDSL:
这个解决方案显然也不好,因为我无法处理的消息通过图的第二个分支并在可处理的消息真正推送到它们的目的地之前被提交,这比第一条消息更糟糕,因为它没有甚至保证至少一次交货。
有人知道我如何解决这个问题吗?
akka-stream - 如何对 Akka Streams 中的事件进行会话/分组?
要求是我想编写一个 Akka 流应用程序,它侦听来自 Kafka 的连续事件,然后根据嵌入在每个事件中的一些 id 值在一个时间范围内对事件数据进行会话化。
例如,假设我的时间框架窗口是两分钟,在前两分钟我得到以下四个事件:
输入:
然后在输出中,在对这些事件进行分组/会话化之后,根据它们的消息域值,我将只有两个事件。
输出:
我希望这能实时发生。关于如何实现这一目标的任何建议?
scala - Akka 在过滤后流式传输 kafka 提交偏移量
我正在尝试为 akka 流中的偏移量制定至少一次提交策略,但我无法理解在我的流上使用过滤器的情况下的预期模式是什么。
我的期望是,过滤后的消息都不会得到它们的偏移量,因此它们最终会陷入无限循环的处理中。
一个说明这一点的荒谬示例是过滤所有消息,如下所示:
我只能看到将过滤器包装在流程中的解决方案,以检查逻辑是否会在这种情况下过滤掉并提交,但这似乎并不优雅,并且降低了过滤器形状的价值。
过滤并不是一件罕见的事情,但我看不到任何优雅的提交偏移量的方式?对我来说,框架没有办法做到这一点似乎很奇怪,所以我错过了什么?
playframework - 与 websocket 连接时使用 akka-stream-kafka 从 kafka 主题获取最后一条消息
是否有可能使用 Akka Streams Kafka 获得关于 Kafka 主题的最后一条消息?我正在创建一个监听 Kafka 主题的 websocket,但目前它在我连接时检索所有先前的未红色消息。这可以加起来很多消息,所以我只对最后一条消息+任何未来消息感兴趣。(或只有未来的消息)
来源:
消费者设置:
我试过添加设置ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
哪个应该“自动将偏移重置为最新的偏移”,但它似乎没有任何效果。
java - 使用 Akka Kafka Streams 时不兼容的等式约束
我正在尝试按照Akka Kafka Streams文档使用 Akka Kafka Streams。这是我的代码:
这是 KafkaJacksonSerializer 的代码:
我不确定到底是什么问题。但是下面的代码没有显示任何错误:
有人可以帮我确定这里出了什么问题吗?
apache-kafka - AVRO 模式更新的问题
我有一个简单的案例类:
我添加字段“名称”
然后在 avro 模式(user.avsc)中添加此字段
此类用于其他案例类:
化学(auth_request.avsc)
在那之后改变我的消费者开始抛出异常
java.util.NoSuchElementException:在 scala.collection.immutable.Stream$Empty$.head(Stream.scala:1104) 在 scala.collection.immutable.Stream$Empty$.head(Stream.scala:1102) 的空流的头在 test.consumers.AuthRequestListener.$anonfun$new$2(AuthRequestListener.scala:39) 在 scala.util.Try$.apply(Try.scala:209) 在 test.consumers.AuthRequestListener.$anonfun$new$1(AuthRequestListener. scala:36) at test.consumers.AuthRequestListener.$anonfun$new$1$adapted(AuthRequestListener.scala:35) at akka.stream.impl.fusing.Map$$anon$9.onPush(Ops.scala:51) at akka .stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter .scala:378) 在 akka。stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588) at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:472) at akka.stream.impl.fusing.GraphInterpreterShell.processEvent( ActorGraphInterpreter.scala:563) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$在 akka.actor.Actor.aroundReceive$(Actor.scala:515) 在 akka.actor.Actor.aroundReceive(Actor.scala:517) 在 akka.stream.impl.fuses 处接收 $1.applyOrElse(ActorGraphInterpreter.scala:760) .ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670) 在 akka.actor.ActorCell.receiveMessage(ActorCell.scala:588) 在 akka.actor。ActorCell.invoke(ActorCell.scala:557) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox .scala:235) 在 akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 在 akka.dispatch.forkjoin.ForkJoinPool。在 akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 运行Worker(ForkJoinPool.java:1979)运行任务(ForkJoinPool.java:1339)在akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)在akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)运行任务(ForkJoinPool.java:1339)在akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)在akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
我试图清理构建并使缓存无效 - 我似乎在某些地方缓存了以前版本的架构,请帮助!
scala - avro4s 无法反序列化 AnyRef
我有一个简单的案例类
然后我想通过制作人将此发送到kafka主题我这样做
这运作良好
然后我尝试消费这个
这适用于有效载荷中的任何类。
但!如果是任何参考。消费者代码失败
错误:(38, 96) 找不到类型为 com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer] val 输入的证据参数的隐式值:AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
错误:(38, 96) 二进制方法的参数不足:(隐含证据 $21:com.sksamuel.avro4s.SchemaFor[test.messages.KafkaContainer],隐含证据 $22:com.sksamuel.avro4s.FromRecord[test.messages. KafkaContainer])com.sksamuel.avro4s.AvroBinaryInputStream[test.messages.KafkaContainer]。未指定值参数证据 $22。val 输入:AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
如果我声明隐含
它无法编译
错误:(58, 71) 找不到类型为 com.sksamuel.avro4s.FromValue[Object] 隐式值的惰性隐式值 fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
错误:(58, 71) 方法lazyConverter 的参数不足:(隐式 fromValue: shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]])shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]]。未指定值参数 fromValue。隐式验证 fromRecord:FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
如果添加每个隐含的编译器是必需的
编译失败并出现错误
宏扩展期间出现错误:(58, 69) 异常:java.lang.IllegalArgumentException:要求失败:需要一个案例类,但 Object 不在 com.sksamuel.avro4s 的 scala.Predef$.require(Predef.scala:277) 处。 FromRecord$.applyImpl(FromRecord.scala:283) 隐式验证 fromRecordObject: FromRecord[Object] = FromRecord[Object]
但是如果我为某个类替换 AnyRef - 不需要隐式,一切都会再次正常
scala - akka-Kafka(scala 框架)中是否有列出所有可用主题的功能
我正在尝试检查传递给我的“启动流”方法的主题是否是我的程序连接到的 Kafka 中有效/已经存在的主题。
我知道 Java 有 KafkaConsumer.ListTopics,但我正在使用 akka-kafka 库,而且 ConsumerSetting 似乎没有相同的方法。我可以使用 Kafka-topics --list 命令执行脚本以列出 kafka 主题的代码,但我更喜欢一种不那么笨拙的方式。
apache-kafka - Akka 流内部和显式缓冲区如何与 alpakka Kafka 中的底层 kafka 客户端设置交互?
我正在尝试使用 akka 流缓冲区来提高流的吞吐量,我想知道它如何应用于 Kafka
尤其是,
关于底层 Kafka API,这里到底发生了什么?
我在底层 Kafka 客户端上有以下配置:
因此我有一个MAX_POLL_RECORDS_CONFIG
,FETCH_MAX_BYTES_CONFIG
和MAX_PARTITION_FETCH_BYTES_CONFIG
我想知道的是缓冲区将如何相对于底层客户端上配置的获取进行播放。
- 是否
Consumer.committableSource
在其自己的 Actor 中实现,并通过其缓冲区从底层 Kafka 客户端接收消息?假设底层客户端被配置为获取多达一百万条消息,并且 Actor 作为1000
? 那是什么意思?会发生什么?Actor 缓冲区是否覆盖了 Kafka 客户端的轮询请求,或者它是否在其邮箱中获取 Kafka 客户端推送的数据,直到其轮询的结果(在底层客户端中配置的最大值)被传递通过?
我想我最需要知道 Kafka 流的内部和/或显式缓冲区如何与轮询请求的设置进行交互。
apache-kafka - Akka 流式处理 Kafka 消费者进程并行
我正在使用 Akka Kafka 连接器开发 Kafka 消费者应用程序。我希望消费者并行处理消息。我应该选择哪个消费群体选择消费者?如何在消费者端配置并行度?