假设我有一个包含许多分区的主题。我在其中写入 K/V 数据,并希望通过键在Tumbling Windows中聚合所述数据。
假设我启动了与分区一样多的工作实例,并且每个工作实例都在单独的机器上运行。
我将如何确保生成的聚合包含每个键的所有值?IE 我不希望每个工作实例都有一些值的子集。
这是StateStore的用途吗?卡夫卡自己管理这个还是我需要想出一个方法?
假设我有一个包含许多分区的主题。我在其中写入 K/V 数据,并希望通过键在Tumbling Windows中聚合所述数据。
假设我启动了与分区一样多的工作实例,并且每个工作实例都在单独的机器上运行。
我将如何确保生成的聚合包含每个键的所有值?IE 我不希望每个工作实例都有一些值的子集。
这是StateStore的用途吗?卡夫卡自己管理这个还是我需要想出一个方法?
我将如何确保生成的聚合包含每个键的所有值?IE 我不希望每个工作实例都有一些值的子集。
一般来说,Kafka Streams 确保相同键的所有值将由相同(且只有一个)流任务处理,这也意味着只有一个应用程序实例(您描述为“工作实例”)将处理该值钥匙。请注意,一个应用实例可能会运行 1+ 个流任务,但这些任务是隔离的。
这种行为是通过对数据进行分区来实现的,Kafka Streams 确保分区始终由相同且仅由一个流任务处理。键/值的逻辑链接是,在 Kafka 和 Kafka Streams 中,键总是被发送到同一个分区(这里有一个问题,但我不确定是否有必要详细介绍这个问题),因此一个特定的分区 - 在可能的许多分区中 - 包含同一键的所有值。
在某些情况下,例如当连接两个流A
和B
确保相关的输入流分区并因此匹配键(分别来自A
和B
)在同一个流任务中可用。您在此处使用的典型方法是selectKey()
. 一旦完成,Kafka Streams 确保,为了连接两个流 A 和 B 以及创建连接的输出流,相同键的所有值将由相同的流任务处理,从而由相同的应用程序实例处理。
例子:
A
有 keyuserId
和 value { georegion }
。B
有 keygeoregion
和 value { continent, description }
。只有当两个流使用相同的密钥时,才可以加入两个流(从 Kafka 0.10.0 开始)。在此示例中,这意味着您必须重新设置密钥(并因此重新分区)流A
,以便将生成的密钥从 更改userId
为georegion
。否则,从 Kafka 0.10 开始,您无法加入A
,B
因为数据不在负责实际执行加入的流任务中。
在此示例中,您可以通过以下方式重新键入/重新分区流A
:
// Kafka 0.10.0.x (latest stable release as of Sep 2016)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic")
// Upcoming versions of Kafka (not released yet)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId))
仅在 Kafka 0.10.0中through()
需要调用才能真正触发重新分区,更高版本的 Kafka 会自动为您完成这些(即将推出的功能已经完成并在 Kafka 中可用trunk
)。
这是 StateStore 的用途吗?卡夫卡自己管理这个还是我需要想出一个方法?
一般来说,没有。上述行为是通过分区实现的,而不是通过状态存储。
有时由于您为流定义的操作而涉及到状态存储,这可能解释了您问这个问题的原因。例如,窗口操作将需要管理状态,因此将在幕后创建状态存储。但是您的实际问题-“确保生成的聚合包括每个键的所有值”-与状态存储无关,它与分区行为有关。
对于工作实例,我假设您的意思是 Kafka Streams 应用程序实例,对吧?(因为 Kafka Streams 中没有 master/worker 模式——它是一个库而不是一个框架——我们不使用术语“worker”。)
如果要按键共同定位数据,则需要按键对数据进行分区。因此,当数据从一开始就被写入主题时,您的数据要么由外部生产者按键分区。或者您在 Kafka Streams 应用程序中显式设置一个新密钥(例如使用selectKey()
or map()
)并通过调用through()
. through()
(在未来的版本中将不需要显式调用,即, 0.10.1
Kafka Streams 将在必要时自动重新分发记录。)
如果应该对消息/记录进行分区,则键不能是null
. 您还可以通过生产者配置更改分区架构partitioner.class
(请参阅https://kafka.apache.org/documentation.html#producerconfigs)。
分区完全独立于 StateStores,即使 StateStores 通常用于分区数据之上。