3

我们有以下问题:

我们想监听某个 Kafka 主题并构建它的“历史” - 因此对于指定的密钥提取一些数据,将其添加到该密钥的现有列表中(或者如果它不存在则创建一个新列表)并将其放入另一个主题,它只有一个分区并且高度压缩。另一个应用程序可以只收听该主题并更新其历史列表。

我在想它如何适合 Kafka 流库。我们当然可以使用聚合:

msgReceived.map((key, word) -> new KeyValue<>(key, word))
           .groupBy((k,v) -> k, stringSerde, stringSerde)
           .aggregate(String::new,
                     (k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
                     stringSerde, "summaries2")
           .to(stringSerde, stringSerde, "transaction-summary50");

它创建了一个由 Kafka 支持的本地存储并将其用作历史表。

我担心的是,如果我们决定扩展这样的应用程序,每个正在运行的实例都会创建一个新的支持主题${applicationId}-${storeName}-changelog(我假设每个应用程序都有不同的applicationId)。每个实例开始使用输入主题,获取一组不同的键并构建不同的状态子集。如果 Kafka 决定重新平衡,一些实例将开始错过本地存储中的一些历史状态,因为它们会获得一组全新的分区来消费。

问题是,如果我只是为每个正在运行的实例设置相同的 applicationId,它最终是否应该重放来自每个正在运行的实例具有相同本地状态的同一个 kafka 主题的所有数据?

4

2 回答 2

7

为什么要创建多个具有不同 ID 的应用程序来执行相同的工作?Kafka 实现并行的方式是通过任务:

应用程序的处理器拓扑通过将其分解为多个任务来扩展。

更具体地说,Kafka Streams 根据应用程序的输入流分区创建固定数量的任务,每个任务分配一个来自输入流的分区列表(即 Kafka 主题)。对任务的分区分配永远不会改变,因此每个任务都是应用程序的固定并行单元。

然后,任务可以根据分配的分区实例化自己的处理器拓扑;它们还为每个分配的分区维护一个缓冲区,并一次处理来自这些记录缓冲区的消息。因此,流任务可以独立并行处理,无需人工干预。

如果您需要扩展您的应用程序,您可以启动运行相同应用程序(相同应用程序 ID)的新实例,并且一些已分配的任务将重新分配给新实例。本地状态存储的迁移将由库自动处理:

当重新分配发生时,一些分区——以及它们相应的任务,包括任何本地状态存储——将从现有线程“迁移”到新添加的线程。因此,Kafka Streams 以 Kafka 主题分区的粒度有效地重新平衡了应用程序实例之间的工作负载。

我建议你看看这个指南

于 2017-01-30T13:34:23.607 回答
5

我担心的是,如果我们决定扩展这样的应用程序,每个正在运行的实例都会创建一个新的支持主题 ${applicationId}-${storeName}-changelog(我假设每个应用程序都有不同的 applicationId)。每个实例开始使用输入主题,获取一组不同的键并构建不同的状态子集。如果 Kafka 决定重新平衡,一些实例将开始错过本地存储中的一些历史状态,因为它们会获得一组全新的分区来消费。

有些假设是不正确的:

  • 如果您运行应用程序的多个实例来扩展您的应用程序,所有这些实例都必须具有相同的应用程序 ID(参见 Kafka 的消费者组管理协议)——否则,负载将不会被共享,因为每个实例都将被视为一个自己的应用程序,每个实例都将分配所有分区。

因此,如果所有实例都使用相同的应用程序 ID,则所有正在运行的应用程序实例都将使用相同的变更日志主题名称,因此,您打算执行的操作应该是开箱即用的。

于 2017-01-30T18:04:02.730 回答