我有一个问题,我需要优先处理一些事件以便更早处理,而一些事件可以在高优先级事件之后说。这些事件来自一个源,我需要根据它们的事件类型优先级对流进行优先级排序,以便在高优先级或低优先级接收器中转发。我正在使用 kafka 和 akka kafka 流。所以主要问题是我在给定的时间点获得了大量的流量。这里的首选方案是什么?
1 回答
首先要解决的是偏移提交。因为处理不会按顺序进行,所以在处理后提交偏移量不能保证至少一次(也不能保证最多一次),因为以下顺序是可能的(并且这种概率不能降低到零):
- 在处理多个低优先级消息之前已处理的高优先级消息的提交偏移量
- 流失败(或运行流的实例停止,或其他)
- 流从上次提交的偏移量重新启动
- 低优先级的消息永远不会再从 Kafka 中读取,因此永远不会被处理
这表明要么偏移提交必须在重新排序之前发生,要么我们需要一个已处理但尚未提交的概念,直到低优先级消息被处理。请注意,对于后一种选项,如果存在任何可能在偏移序列中产生间隙(这意味着无限保留且不压缩)的任何东西,那么跟踪未提交的最大偏移量(可能有效的最简单策略)将不起作用,我实际上建议提交处理之前的偏移量,但是一旦处理逻辑保证它最终会处理消息。
Actor 和 Akka Persistence 的组合允许采用这种方法。粗略的大纲是要有一个持久的参与者(这非常适合事件溯源),并且基本上维护要处理的高优先级和低优先级消息的列表。流向参与者发送带有来自 Kafka 的消息的“询问”,参与者在收到消息时将消息分类为高/低优先级,假设消息尚未被处理。消息(可能还有它的分类)作为一个事件被持久化,并且参与者确认收到了消息,并且它承诺通过将消息安排给自己来完全处理“待处理”消息来处理它。确认完成请求,允许将偏移量提交给 Kafka。收到消息后(命令,真的)为了处理一条消息,actor 选择要处理的 Kafka 消息(按优先级、年龄等)并坚持认为它已处理该消息(从而将其从“to-process”移动到“processed”)并且可能还会持续存在与如何解释 Kafka 消息相关的事件更新状态。在这种持久性之后,actor 向自己发送另一个命令来处理“to-process”消息。
然后,通过让后台进程使用“处理到进程消息”命令定期 ping 该参与者来实现容错。
与流一样,这是一个单个逻辑线程每个分区的进程。您可能会在每个物理 Kafka 分区中多路复用多个具有状态价值的分区,在这种情况下,您可以拥有多个这些参与者并从摄取流中发送多个请求。如果这样做,周期性 ping 可能最好由 Akka 持久性查询提供的流来完成,以获取所有持久性参与者的标识符。
请注意,此问题中的重新排序从根本上使其成为一场竞赛,因此是非确定性的:在此设计草图中,竞赛是因为来自参与者 B 的消息 M1 和来自参与者 C 的消息 M2 发送到参与者 A 可能以任何顺序接收(如果Actor B 在发送消息 M1 后向 Actor A 发送了一条消息 M3,M3 将在 M1 之后到达,但可能在 M2 之前或之后到达)。在另一种设计中,可能会根据处理速度相对于 Kafka 使消息可供消费的延迟而发生竞争。