7

我是 Kafka 的新手,我们的团队正在研究服务间通信的模式。

目标

我们有两个服务,P(生产者)和 C(消费者)。P 是 C 需要的一组数据的真实来源。当 C 启动时,它需要将所有当前数据从 P 加载到它的缓存中,然后订阅更改通知。(换句话说,我们希望在服务之间同步数据。)

数据总量比较少,变化不频繁。同步的短暂延迟是可以接受的(最终一致性)。

我们希望将服务解耦,这样 P 和 C 就不需要相互了解。

提案

当 P 启动时,它会将其所有数据发布到启用了日志压缩的 Kafka 主题。每条消息都是一个聚合,其 ID 为一个键。

当 C 启动时,它会从主题的开头读取所有消息并填充其缓存。然后它会继续从其偏移量中读取以收到更新通知。

当 P 更新其数据时,它会为已更改的聚合发布一条消息。(此消息与原始消息具有相同的架构。)

当 C 收到一条新消息时,它会更新其缓存中的相应数据。

在此处输入图像描述

约束

我们正在使用Confluent REST 代理与 Kafka 进行通信。

问题

当 C 启动时,它如何知道它何时从主题中读取了所有消息,以便它可以安全地开始处理?

如果 C 没有立即注意到 P 一秒钟前发送的消息,这是可以接受的。如果 C 在消费一小时前 P 发送的消息之前开始处理,这是不可接受的。请注意,我们不知道何时会更新 P 的数据。

我们不希望 C 在消费每条消息后必须等待 REST 代理的轮询间隔。

4

2 回答 2

3

如果你想找到一个消费者组的结束分区,为了知道你什么时候在某个时间点获得了所有数据,你可以使用

POST /consumers/(string: group_name)/instances/(string: instance)/positions/end

请注意,您必须GET /consumers/.../records在该搜索之前进行轮询(),但您不需要提交。

如果您不想影响现有消费者组的偏移量,则必须单独发布一个。

然后,您可以查询偏移量

GET /consumers/(string: group_name)/instances/(string: instance)/offsets

请注意,在计算结束偏移量和实际到达结束之间可能会有数据写入主题,因此您可能希望在最终到达结束后进行一些额外的设置以进行更多的消费。

于 2019-07-29T18:15:21.753 回答
0

替代解决方案(未测试):

如果消费者也充当生产者怎么办?

  1. 当 C 启动时,它使用一个不会与 P 中的键重叠的键向压缩主题(它将读取的主题相同)发布一条消息。该值是 GUID 或随机数;基本上是一个随机数。
  2. C 订阅压缩主题并开始消费。
  3. 当 C 接收到它的唯一密钥,其随机数与它发送的内容匹配(如果清理线程尚未压缩日志,它可能会多次获取密钥),它知道它可以安全地开始处理。

这确实假设一个分区。

于 2019-07-31T21:47:36.537 回答