11

我的流有一个名为“类别”的列,我在不同的商店中为每个“类别”提供了额外的静态元数据,它每隔几天更新一次。进行此查找的正确方法是什么?Kafka 流有两种选择

  1. 在 Kafka Streams 之外加载静态数据并仅用于KStreams#map()添加元数据。这是可能的,因为 Kafka Streams 只是一个库。

  2. 将元数据加载到 Kafka 主题,将其加载到 aKTable并执行KStreams#leftJoin(),这似乎更自然,并将分区等留给 Kafka Streams。但是,这要求我们保持KTable加载所有值。请注意,我们必须加载整个查找数据,而不仅仅是更改。

    • 例如,假设最初只有一个类别“c1”。Kafka 流应用程序已正常停止,然后再次重新启动。重新启动后,添加了一个新类别“c2”。我的假设是,table = KStreamBuilder().table('metadataTopic') 将只有值'c2',因为这是自应用程序第二次启动以来唯一发生变化的事情。我希望它有'c1'和'c2'。
    • 如果它也有'c1',是否会从 KTable 中删除数据(可能通过设置发送 key = null 消息?)?

以上哪一项是查找元数据的正确方法?

是否可以始终强制在重新启动时从头开始读取一个流,这样所有元数据都可以加载到KTable.

还有其他使用商店的方法吗?

4

3 回答 3

14
  1. 在 Kafka Streams 之外加载静态数据,只需使用 KStreams#map() 添加元数据。这是可能的,因为 Kafka Streams 只是一个库。

这行得通。但通常人们会选择您列出的下一个选项,因为用于丰富输入流的辅助数据通常不是完全静态的;相反,它正在发生变化,但并不频繁:

  1. 将元数据加载到 Kafka 主题,将其加载到 KTable 并执行 KStreams#leftJoin(),这似乎更自然,并将分区等留给 Kafka Streams。但是,这要求我们保持 KTable 加载所有值。请注意,我们必须加载整个查找数据,而不仅仅是更改。

这是通常的方法,除非您有特定的理由不这样做,否则我建议您坚持下去。

但是,这要求我们保持 KTable 加载所有值。请注意,我们必须加载整个查找数据,而不仅仅是更改。

所以我猜你也更喜欢第二种选择,但你担心这是否有效。

简短的回答是:是的,KTable 将加载每个键的所有(最新)值。该表将包含整个查找数据,但请记住,KTable 在幕后进行了分区:例如,如果您的输入主题(用于表)具有3分区,那么您最多可以运行3应用程序的实例,每个其中获取1表的分区(假设数据均匀分布在分区中,那么表的每个分区/共享将保存大约 1/3 的表数据)。因此,在实践中,它更有可能“正常工作”。我在下面分享更多细节。

全局 KTables:或者,您可以使用全局 KTables而不是(分区的)普通表变体。使用全局表,应用程序的每个实例都具有表数据的完整副本。这使得全局表对于连接场景非常有用,包括根据您的问题丰富 KStream。

是否可以始终强制在重新启动时从头开始读取一个流,这样所有元数据都可以加载到 KTable 中。

你不必担心这一点。简单地说,如果没有可用的表的本地“副本”,那么 Streams API 将自动确保从头开始完全读取表的数据。如果有可用的本地副本,那么您的应用程序将重新使用该副本(并在表的输入主题中有新数据时更新其本地副本)。

带示例的更长答案

想象一下你的以下输入数据(想想:changelog 流)KTable,注意这个输入是如何由6消息组成的:

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)

以下是此输入导致的“逻辑”的各种状态KTable,其中每个新接收到的输入消息(例如(alice, 1))都会导致表的新状态:

Key      Value
--------------
alice   |   1    // (alice, 1) received

 |
 V

Key      Value
--------------
alice   |   1
bob     |  40    // (bob, 40) received

 |
 V

Key      Value
--------------
alice   |   2    // (alice, 2) received
bob     |  40

 |
 V

Key      Value
--------------
alice   |   2
bob     |  40
charlie | 600    // (charlie, 600) received

 |
 V

Key      Value
--------------
alice   |   5    // (alice, 5) received
bob     |  40
charlie | 600

 |
 V

Key      Value
--------------
alice   |   5
bob     |  22    // (bob, 22) received
charlie | 600

您可以在这里看到的是,即使输入数据可能有很多很多消息(或您所说的“更改”;在这里,我们有6),结果中的条目/行数KTable(正在经历基于连续突变在新收到的输入上)是输入中唯一键的数量(这里:从 开始1,逐渐上升到3),这通常明显少于消息的数量。因此,如果输入中的消息数是N并且这些消息的唯一键数是M,那么通常M << N(M明显小于N;另外,为了记录,我们有不变量M <= N)。

这是“这要求我们保持 KTable 加载所有值”通常不是问题的第一个原因,因为每个键只保留最新值。

第二个有帮助的原因是,正如 Matthias J. Sax 所指出的,Kafka Streams 使用 RocksDB 作为此类表的默认存储引擎(更准确地说:状态将其存储在表中)。RocksDB 允许您维护大于应用程序可用主内存/Java 堆空间的表,因为它可能会溢出到本地磁盘。

最后,第三个原因是 aKTable被分区了。因此,如果您的表的输入主题(例如)配置了3分区,那么幕后发生的事情就是它KTable本身以相同的方式分区(认为:分片)。在上面的示例中,您可以得到以下结果,尽管确切的“拆分”取决于原始输入数据在表输入主题的分区中的分布方式:

逻辑 KTable(我上面显示的最后状态):

Key      Value
--------------
alice   |   5
bob     |  22
charlie | 600

实际的 KTable,分区(假设3表的输入主题的分区,加上键 = 用户名均匀分布在分区中):

Key      Value
--------------
alice   |   5    // Assuming that all data for `alice` is in partition 1

Key      Value
--------------
bob     |  22    // ...for `bob` is in partition 2

Key      Value
--------------
charlie | 600    // ...for `charlie` is in partition 3

在实践中,输入数据的这种分区(除其他外)允许您“调整” KTable 的实际表现形式。

另一个例子:

  • 想象一下,您的 KTable 的最新状态通常具有 1 TB 的大小(同样,近似大小是表输入数据中唯一消息键的数量乘以相关消息值的平均大小的函数)。
  • 如果表的输入主题只有一个1分区,那么 KTable 本身也只有一个1分区,大小为 1 TB。在这里,因为输入主题只有1分区,所以您可以使用最多1应用程序实例来运行您的应用程序(所以并不是真正的大量并行性,呵呵)。
  • 如果表的输入主题有500分区,那么 KTable 也有500分区,每个分区的大小约为 2 GB(假设数据均匀分布在分区中)。在这里,您可以使用最多500应用程序实例来运行您的应用程序。如果您要运行精确的500实例,那么每个应用程序实例将获得1逻辑 KTable 的精确分区/分片,从而最终得到 2 GB 的表数据;如果你只运行100实例,那么每个实例都会得到500 / 100 = 5表的分区/分片,最终得到大约2 GB * 5 = 10 GB表数据。
于 2016-12-08T09:12:08.327 回答
6

您的整体观察是正确的,这取决于哪些权衡对您更重要。如果您的元数据很小,选项 1 似乎更好。如果元数据很大,似乎选项 2 是要走的路。

如果您使用map(),则需要在每个应用程序实例中拥有元数据的完整副本(因为您无法确切知道 Streams 将如何对您的KStream数据进行分区)。因此,如果您的元数据不适合主内存,则使用map()将无法轻松工作。

如果您使用KTable,Streams 将确保元数据在所有正在运行的应用程序实例上正确分片,这样就不需要数据重复。此外,aKTable使用 RocksDB 作为状态存储引擎,因此可以溢出到磁盘。

编辑开始

关于将所有数据放入KTable:如果同一键有两个类别,如果您将数据直接从主题读取到KTablevia builder.table(...)(更改日志语义),则第二个值将覆盖第一个值。但是,您可以通过将主题读取为记录流来轻松解决此问题(即,builder.stream(...)并应用聚合来计算KTable. 您的聚合将简单地发出每个键的所有值的列表。

关于删除:KTable使用 changelog 语义并且确实理解 tombstone 消息来删除键值对。因此,如果您KTable从主题中读取 a 并且该主题包含一条<key:null>消息,则使用此键的当前记录KTable将被删除。当 是聚合的结果时,这更难实现KTable,因为带有null键或null值的聚合输入记录将被忽略并且不会更新聚合结果。

解决方法是在聚合之前添加一个map()步骤并引入一个NULL值(即,代表墓碑但不是用户定义的“对象” null- 在您的情况下,您可以将其称为 a null-category)。null在您的聚合中,如果输入记录具有值,您只需返回一个值作为聚合结果null-category。然后,这将为您翻译一条墓碑消息,KTable并删除此键的当前类别列表。

编辑结束

当然,您始终可以通过处理器 API 构建自定义解决方案。但是,如果 DSL 可以满足您的需要,那么没有充分的理由这样做。

于 2016-12-08T06:22:58.347 回答
0

从 2017 年 2 月发布的 Kafka 0.10.2.0 开始,该GlobalKTable概念可能是使用查找数据丰富流的更好选择。

https://docs.confluent.io/current/streams/concepts.html#globalktable

于 2019-02-16T17:38:48.063 回答