问题标签 [akka-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
96 浏览

scala - Lagom 主题订阅者 - 如何在未来异常中重试?

我在lagom下面有一个主题订阅者

订阅这个主题,我使用atLeastOnce作为方法,所以如果有任何异常,我希望流程重新启动/重试。当我抛出一个正常的异常时,它可以继续正常重试

但是当将来发生异常时,无论我如何尝试,Flow都不会重新启动。这是我将来处理异常的尝试之一

它会显示一个异常,但Flow不会自动重新启动它。我应该如何处理/抛出异常Future

0 投票
2 回答
146 浏览

akka - 这个 Akka Kafka Stream 配置是否受益于 Akka Streams 的背压机制?

我们有一个 Akka 应用程序,它使用 Kafka 主题并将接收到的消息发送到 Akka Actor。我不确定我的编程方式是否使用了 Akka Streams 中内置的背压机制的所有好处。

以下是我的配置...

这符合我对业务案例的期望,myActor 收到命令更新(MyAvro)

我对背压的技术概念更加恼火,据我所知,背压机制部分由接收器控制,但在这个流配置中,我的接收器只是“Sink.ignore”。所以我的 Sink 正在为 Back Pressure 做任何事情。

当 Akka Kafka Stream 提交 Kafka Topic 偏移量时,我还很好奇什么?命令发送到 MyActor 邮箱的那一刻?如果是这样,那么我如何处理诸如询问模式之类的场景,Kafka Offset 不应该在询问模式完成之前提交。

我看到一些处理手动偏移控制“plainPartitionedManualOffsetSource”、“commitablePartitionManualOffsetSource”的工厂方法,但我找不到任何示例,我可以用我的业务逻辑决定手动提交偏移吗?

作为替代配置,我可以使用类似的东西。

现在我可以访问 Sink.actorRef,我认为 Back Pressure 机制有机会控制 Back Pressure,自然这段代码将不起作用,因为我不知道如何在这个星座下访问“myAvro”。

谢谢回答..

0 投票
0 回答
30 浏览

.net-core - 对象数组的 Avro 模式定义

我是 Kafka 和 Avro 序列化和反序列化的新手。我在 .NET 中有以下类定义

我有多个生产者,它们为网关产生不同类型的请求。每个请求都会有不同的请求参数,这些参数将根据&object[]的组合在中定义。有一个消费者与网关连接的系统交互,通过它我们可以向网关发送请求。ServiceIdHandlerId

现在我想.avsc为上述数据结构以文件的形式生成 Avro 模式定义,以便所有 GWRequest 必须由生产者在单个主题上发布。我找不到object[] Requestparams在 Avro 中定义的任何线索。

谢谢你的帮助。

0 投票
1 回答
77 浏览

java - 快速处理主题和慢速处理主题 - Akka Kafka

我有一个问题,我需要优先处理一些事件以便更早处理,而一些事件可以在高优先级事件之后说。这些事件来自一个源,我需要根据它们的事件类型优先级对流进行优先级排序,以便在高优先级或低优先级接收器中转发。我正在使用 kafka 和 akka kafka 流。所以主要问题是我在给定的时间点获得了大量的流量。这里的首选方案是什么?

0 投票
1 回答
141 浏览

kafka-consumer-api - 当我们的 Kafka 分区中存在延迟时,Akka Kafka Consumer 处理率会急剧下降

我们正面临这样一种情况,即我们的 akka-stream-kaka-consumer 处理率在出现延迟时会下降。当我们在分区中没有任何延迟的情况下启动它时,处理速度会突然增加。

MSK 集群 - 10 个主题 - 每个 40 个分区 => 400 个总领导分区

为了在系统中实现高吞吐量和并行性,我们实现了 akka-stream-kafka 消费者,分别订阅每个主题分区,从而在消费者和分区之间实现 1:1 映射。

这是消费者设置:

  1. ec2 服务实例数 - 7
  2. 每个服务为 10 个主题中的每个主题启动 6 个消费者,从而导致每个服务实例有 60 个消费者。
  3. 总消费者 = 实例数 (7) * 每个服务实例上的消费者数 (60) = 420

因此,我们总共启动了 420 个消费者,分布在不同的实例中。根据 RangeAssignor 分区策略(默认一个),每个分区将分配给不同的消费者,400 个消费者将使用 400 个分区,20 个消费者将保持未使用状态。我们已经验证了这个分配并且看起来不错。

使用的实例类型: c5.xlarge

MSK 配置:

阿帕奇卡夫卡版本- 2.4.1.1

经纪人总数- 9(分布在 3 个可用区)

经纪人类型: kafka.m5.large

每个区域的经纪人: 3

auto.create.topics.enable =真

default.replication.factor =3

min.insync.replicas =2

num.io.threads =8

num.network.threads = 5

分区数=40

num.replica.fetchers =2

副本.lag.time.max.ms = 30000

socket.receive.buffer.bytes = 102400

socket.request.max.bytes =104857600

socket.send.buffer.bytes = 102400

unclean.leader.election.enable =真

zookeeper.session.timeout.ms = 18000

log.retention.ms = 259200000

这是我们为每个消费者使用的配置

我们正在使用下面的代码来实现从 Kafka 到空接收器的流程

代码中的库版本

观察如下,

  1. 假设在 1 小时的连续间隔中,只有一些消费者
    以预期的速度积极地消耗延迟和处理。
  2. 在接下来的 1 小时内,其他一些消费者变得活跃并
    从其分区中主动消费,然后停止处理。
  3. 从 offsetLag 图表中观察到,所有滞后都将在一次拍摄中清除。

我们希望所有消费者并行运行并实时处理消息。这 3 天的处理延迟给我们造成了很大的停机时间。我尝试按照给定的链接,但我们已经在固定版本 https://github.com/akka/alpakka-kafka/issues/549

任何人都可以帮助我们在消费者配置或其他问题方面缺少什么。

每个主题每个分区的偏移滞后图

0 投票
1 回答
98 浏览

scala - 为什么启用自动提交的 Kafka 客户端在消费者关闭期间提交最新生成的消息的偏移量,即使该消息尚未被消费?

TLDR:

  • 对于启用自动提交的 Kafka 客户端,提交生成的消息的偏移量是否已消耗(即使不是)预期行为?(对于消费和生产相同主题的应用程序)

详细解释:

我有一个简单的 scala 应用程序,它有一个 Akka 演员,它使用来自 Kafka 主题的消息,如果在消息处理期间发生任何异常,则将消息生成到同一主题。

TestActor.scala

该演员每分钟开始并运行 20 秒然后停止。

启动器.scala

ExceptionManagerActor.scala

BaseActorWithAutoCommit.scala

使用此配置,在停止时,Kafka 客户端将提交最新生成的消息的偏移量,就好像它已被消费一样。

示例日志:

从日志中可以看到,产生了一条偏移量为 107 的消息,但在下一个周期中没有被消费。

实际上,我不是 Akka 演员的专家,不知道这种情况是来自 Kafka 还是 Akka,但似乎与我的自动提交有关。


使用的依赖版本:

可以从此存储库获得示例源代码和重现该情况的步骤