- 在 Kafka Streams 之外加载静态数据,只需使用 KStreams#map() 添加元数据。这是可能的,因为 Kafka Streams 只是一个库。
这行得通。但通常人们会选择您列出的下一个选项,因为用于丰富输入流的辅助数据通常不是完全静态的;相反,它正在发生变化,但并不频繁:
- 将元数据加载到 Kafka 主题,将其加载到 KTable 并执行 KStreams#leftJoin(),这似乎更自然,并将分区等留给 Kafka Streams。但是,这要求我们保持 KTable 加载所有值。请注意,我们必须加载整个查找数据,而不仅仅是更改。
这是通常的方法,除非您有特定的理由不这样做,否则我建议您坚持下去。
但是,这要求我们保持 KTable 加载所有值。请注意,我们必须加载整个查找数据,而不仅仅是更改。
所以我猜你也更喜欢第二种选择,但你担心这是否有效。
简短的回答是:是的,KTable 将加载每个键的所有(最新)值。该表将包含整个查找数据,但请记住,KTable 在幕后进行了分区:例如,如果您的输入主题(用于表)具有3
分区,那么您最多可以运行3
应用程序的实例,每个其中获取1
表的分区(假设数据均匀分布在分区中,那么表的每个分区/共享将保存大约 1/3 的表数据)。因此,在实践中,它更有可能“正常工作”。我在下面分享更多细节。
全局 KTables:或者,您可以使用全局 KTables而不是(分区的)普通表变体。使用全局表,应用程序的每个实例都具有表数据的完整副本。这使得全局表对于连接场景非常有用,包括根据您的问题丰富 KStream。
是否可以始终强制在重新启动时从头开始读取一个流,这样所有元数据都可以加载到 KTable 中。
你不必担心这一点。简单地说,如果没有可用的表的本地“副本”,那么 Streams API 将自动确保从头开始完全读取表的数据。如果有可用的本地副本,那么您的应用程序将重新使用该副本(并在表的输入主题中有新数据时更新其本地副本)。
带示例的更长答案
想象一下你的以下输入数据(想想:changelog 流)KTable
,注意这个输入是如何由6
消息组成的:
(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)
以下是此输入导致的“逻辑”的各种状态KTable
,其中每个新接收到的输入消息(例如(alice, 1)
)都会导致表的新状态:
Key Value
--------------
alice | 1 // (alice, 1) received
|
V
Key Value
--------------
alice | 1
bob | 40 // (bob, 40) received
|
V
Key Value
--------------
alice | 2 // (alice, 2) received
bob | 40
|
V
Key Value
--------------
alice | 2
bob | 40
charlie | 600 // (charlie, 600) received
|
V
Key Value
--------------
alice | 5 // (alice, 5) received
bob | 40
charlie | 600
|
V
Key Value
--------------
alice | 5
bob | 22 // (bob, 22) received
charlie | 600
您可以在这里看到的是,即使输入数据可能有很多很多消息(或您所说的“更改”;在这里,我们有6
),结果中的条目/行数KTable
(正在经历基于连续突变在新收到的输入上)是输入中唯一键的数量(这里:从 开始1
,逐渐上升到3
),这通常明显少于消息的数量。因此,如果输入中的消息数是N
并且这些消息的唯一键数是M
,那么通常M << N
(M
明显小于N
;另外,为了记录,我们有不变量M <= N
)。
这是“这要求我们保持 KTable 加载所有值”通常不是问题的第一个原因,因为每个键只保留最新值。
第二个有帮助的原因是,正如 Matthias J. Sax 所指出的,Kafka Streams 使用 RocksDB 作为此类表的默认存储引擎(更准确地说:状态将其存储在表中)。RocksDB 允许您维护大于应用程序可用主内存/Java 堆空间的表,因为它可能会溢出到本地磁盘。
最后,第三个原因是 aKTable
被分区了。因此,如果您的表的输入主题(例如)配置了3
分区,那么幕后发生的事情就是它KTable
本身以相同的方式分区(认为:分片)。在上面的示例中,您可以得到以下结果,尽管确切的“拆分”取决于原始输入数据在表输入主题的分区中的分布方式:
逻辑 KTable(我上面显示的最后状态):
Key Value
--------------
alice | 5
bob | 22
charlie | 600
实际的 KTable,分区(假设3
表的输入主题的分区,加上键 = 用户名均匀分布在分区中):
Key Value
--------------
alice | 5 // Assuming that all data for `alice` is in partition 1
Key Value
--------------
bob | 22 // ...for `bob` is in partition 2
Key Value
--------------
charlie | 600 // ...for `charlie` is in partition 3
在实践中,输入数据的这种分区(除其他外)允许您“调整” KTable 的实际表现形式。
另一个例子:
- 想象一下,您的 KTable 的最新状态通常具有 1 TB 的大小(同样,近似大小是表输入数据中唯一消息键的数量乘以相关消息值的平均大小的函数)。
- 如果表的输入主题只有一个
1
分区,那么 KTable 本身也只有一个1
分区,大小为 1 TB。在这里,因为输入主题只有1
分区,所以您可以使用最多1
应用程序实例来运行您的应用程序(所以并不是真正的大量并行性,呵呵)。
- 如果表的输入主题有
500
分区,那么 KTable 也有500
分区,每个分区的大小约为 2 GB(假设数据均匀分布在分区中)。在这里,您可以使用最多500
应用程序实例来运行您的应用程序。如果您要运行精确的500
实例,那么每个应用程序实例将获得1
逻辑 KTable 的精确分区/分片,从而最终得到 2 GB 的表数据;如果你只运行100
实例,那么每个实例都会得到500 / 100 = 5
表的分区/分片,最终得到大约2 GB * 5 = 10 GB
表数据。