5

假设我有一个包含许多分区的主题。我在其中写入 K/V 数据,并希望通过键在Tumbling Windows中聚合所述数据。

假设我启动了与分区一样多的工作实例,并且每个工作实例都在单独的机器上运行。

我将如何确保生成的聚合包含每个键的所有值?IE 我不希望每个工作实例都有一些值的子集。

这是StateStore的用途吗?卡夫卡自己管理这个还是我需要想出一个方法?

4

2 回答 2

9

我将如何确保生成的聚合包含每个键的所有值?IE 我不希望每个工作实例都有一些值的子集。

一般来说,Kafka Streams 确保相同键的所有值将由相同(且只有一个)流任务处理,这也意味着只有一个应用程序实例(您描述为“工作实例”)将处理该值钥匙。请注意,一个应用实例可能会运行 1+ 个流任务,但这些任务是隔离的。

这种行为是通过对数据进行分区来实现的,Kafka Streams 确保分区始终由相同且仅由一个流任务处理。键/值的逻辑链接是,在 Kafka 和 Kafka Streams 中,键总是被发送到同一个分区(这里有一个问题,但我不确定是否有必要详细介绍这个问题),因此一个特定的分区 - 在可能的许多分区中 - 包含同一键的所有值。

在某些情况下,例如当连接两个流AB确保相关的输入流分区并因此匹配键(分别来自AB)在同一个流任务中可用。您在此处使用的典型方法是selectKey(). 一旦完成,Kafka Streams 确保,为了连接两个流 A 和 B 以及创建连接的输出流,相同键的所有值将由相同的流任务处理,从而由相同的应用程序实例处理。

例子:

  • StreamA有 keyuserId和 value { georegion }
  • StreamB有 keygeoregion和 value { continent, description }

只有当两个流使用相同的密钥时,才可以加入两个流(从 Kafka 0.10.0 开始)。在此示例中,这意味着您必须重新设置密钥(并因此重新分区)流A,以便将生成的密钥从 更改userIdgeoregion。否则,从 Kafka 0.10 开始,您无法加入AB因为数据不在负责实际执行加入的流任务中。

在此示例中,您可以通过以下方式重新键入/重新分区流A

// Kafka 0.10.0.x (latest stable release as of Sep 2016)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic")

// Upcoming versions of Kafka (not released yet)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId))

仅在 Kafka 0.10.0中through()需要调用才能真正触发重新分区,更高版本的 Kafka 会自动为您完成这些(即将推出的功能已经完成并在 Kafka 中可用trunk)。

这是 StateStore 的用途吗?卡夫卡自己管理这个还是我需要想出一个方法?

一般来说,没有。上述行为是通过分区实现的,而不是通过状态存储。

有时由于您为流定义的操作而涉及到状态存储,这可能解释了您问这个问题的原因。例如,窗口操作将需要管理状态,因此将在幕后创建状态存储。但是您的实际问题-“确保生成的聚合包括每个键的所有值”-与状态存储无关,它与分区行为有关。

于 2016-09-02T07:37:16.480 回答
1

对于工作实例,我假设您的意思是 Kafka Streams 应用程序实例,对吧?(因为 Kafka Streams 中没有 master/worker 模式——它是一个库而不是一个框架——我们不使用术语“worker”。)

如果要按键共同定位数据,则​​需要按键对数据进行分区。因此,当数据从一开始就被写入主题时,您的数据要么由外部生产者按键分区。或者您在 Kafka Streams 应用程序中显式设置一个新密钥(例如使用selectKey()or map())并通过调用through(). through()(在未来的版本中将不需要显式调用,即, 0.10.1Kafka Streams 将在必要时自动重新分发记录。)

如果应该对消息/记录进行分区,则键不能是null. 您还可以通过生产者配置更改分区架构partitioner.class(请参阅https://kafka.apache.org/documentation.html#producerconfigs)。

分区完全独立于 StateStores,即使 StateStores 通常用于分区数据之​​上。

于 2016-09-01T06:32:46.487 回答