问题标签 [akka-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Lagom 主题订阅者 - 如何在未来异常中重试?
我在lagom
下面有一个主题订阅者
订阅这个主题,我使用atLeastOnce
作为方法,所以如果有任何异常,我希望流程重新启动/重试。当我抛出一个正常的异常时,它可以继续正常重试
但是当将来发生异常时,无论我如何尝试,Flow都不会重新启动。这是我将来处理异常的尝试之一
它会显示一个异常,但Flow
不会自动重新启动它。我应该如何处理/抛出异常Future
?
akka - 这个 Akka Kafka Stream 配置是否受益于 Akka Streams 的背压机制?
我们有一个 Akka 应用程序,它使用 Kafka 主题并将接收到的消息发送到 Akka Actor。我不确定我的编程方式是否使用了 Akka Streams 中内置的背压机制的所有好处。
以下是我的配置...
这符合我对业务案例的期望,myActor 收到命令更新(MyAvro)
我对背压的技术概念更加恼火,据我所知,背压机制部分由接收器控制,但在这个流配置中,我的接收器只是“Sink.ignore”。所以我的 Sink 正在为 Back Pressure 做任何事情。
当 Akka Kafka Stream 提交 Kafka Topic 偏移量时,我还很好奇什么?命令发送到 MyActor 邮箱的那一刻?如果是这样,那么我如何处理诸如询问模式之类的场景,Kafka Offset 不应该在询问模式完成之前提交。
我看到一些处理手动偏移控制“plainPartitionedManualOffsetSource”、“commitablePartitionManualOffsetSource”的工厂方法,但我找不到任何示例,我可以用我的业务逻辑决定手动提交偏移吗?
作为替代配置,我可以使用类似的东西。
现在我可以访问 Sink.actorRef,我认为 Back Pressure 机制有机会控制 Back Pressure,自然这段代码将不起作用,因为我不知道如何在这个星座下访问“myAvro”。
谢谢回答..
.net-core - 对象数组的 Avro 模式定义
我是 Kafka 和 Avro 序列化和反序列化的新手。我在 .NET 中有以下类定义
我有多个生产者,它们为网关产生不同类型的请求。每个请求都会有不同的请求参数,这些参数将根据&object[]
的组合在中定义。有一个消费者与网关连接的系统交互,通过它我们可以向网关发送请求。ServiceId
HandlerId
现在我想.avsc
为上述数据结构以文件的形式生成 Avro 模式定义,以便所有 GWRequest 必须由生产者在单个主题上发布。我找不到object[] Requestparams
在 Avro 中定义的任何线索。
谢谢你的帮助。
java - 快速处理主题和慢速处理主题 - Akka Kafka
我有一个问题,我需要优先处理一些事件以便更早处理,而一些事件可以在高优先级事件之后说。这些事件来自一个源,我需要根据它们的事件类型优先级对流进行优先级排序,以便在高优先级或低优先级接收器中转发。我正在使用 kafka 和 akka kafka 流。所以主要问题是我在给定的时间点获得了大量的流量。这里的首选方案是什么?
kafka-consumer-api - 当我们的 Kafka 分区中存在延迟时,Akka Kafka Consumer 处理率会急剧下降
我们正面临这样一种情况,即我们的 akka-stream-kaka-consumer 处理率在出现延迟时会下降。当我们在分区中没有任何延迟的情况下启动它时,处理速度会突然增加。
MSK 集群 - 10 个主题 - 每个 40 个分区 => 400 个总领导分区
为了在系统中实现高吞吐量和并行性,我们实现了 akka-stream-kafka 消费者,分别订阅每个主题分区,从而在消费者和分区之间实现 1:1 映射。
这是消费者设置:
- ec2 服务实例数 - 7
- 每个服务为 10 个主题中的每个主题启动 6 个消费者,从而导致每个服务实例有 60 个消费者。
- 总消费者 = 实例数 (7) * 每个服务实例上的消费者数 (60) = 420
因此,我们总共启动了 420 个消费者,分布在不同的实例中。根据 RangeAssignor 分区策略(默认一个),每个分区将分配给不同的消费者,400 个消费者将使用 400 个分区,20 个消费者将保持未使用状态。我们已经验证了这个分配并且看起来不错。
使用的实例类型: c5.xlarge
MSK 配置:
阿帕奇卡夫卡版本- 2.4.1.1
经纪人总数- 9(分布在 3 个可用区)
经纪人类型: kafka.m5.large
每个区域的经纪人: 3
auto.create.topics.enable =真
default.replication.factor =3
min.insync.replicas =2
num.io.threads =8
num.network.threads = 5
分区数=40
num.replica.fetchers =2
副本.lag.time.max.ms = 30000
socket.receive.buffer.bytes = 102400
socket.request.max.bytes =104857600
socket.send.buffer.bytes = 102400
unclean.leader.election.enable =真
zookeeper.session.timeout.ms = 18000
log.retention.ms = 259200000
这是我们为每个消费者使用的配置
我们正在使用下面的代码来实现从 Kafka 到空接收器的流程
代码中的库版本
观察如下,
- 假设在 1 小时的连续间隔中,只有一些消费者
以预期的速度积极地消耗延迟和处理。 - 在接下来的 1 小时内,其他一些消费者变得活跃并
从其分区中主动消费,然后停止处理。 - 从 offsetLag 图表中观察到,所有滞后都将在一次拍摄中清除。
我们希望所有消费者并行运行并实时处理消息。这 3 天的处理延迟给我们造成了很大的停机时间。我尝试按照给定的链接,但我们已经在固定版本 https://github.com/akka/alpakka-kafka/issues/549
任何人都可以帮助我们在消费者配置或其他问题方面缺少什么。
scala - 为什么启用自动提交的 Kafka 客户端在消费者关闭期间提交最新生成的消息的偏移量,即使该消息尚未被消费?
TLDR:
- 对于启用自动提交的 Kafka 客户端,提交生成的消息的偏移量是否已消耗(即使不是)预期行为?(对于消费和生产相同主题的应用程序)
详细解释:
我有一个简单的 scala 应用程序,它有一个 Akka 演员,它使用来自 Kafka 主题的消息,如果在消息处理期间发生任何异常,则将消息生成到同一主题。
该演员每分钟开始并运行 20 秒然后停止。
使用此配置,在停止时,Kafka 客户端将提交最新生成的消息的偏移量,就好像它已被消费一样。
示例日志:
从日志中可以看到,产生了一条偏移量为 107 的消息,但在下一个周期中没有被消费。
实际上,我不是 Akka 演员的专家,不知道这种情况是来自 Kafka 还是 Akka,但似乎与我的自动提交有关。
使用的依赖版本:
可以从此存储库获得示例源代码和重现该情况的步骤