0

我想做一个 KStream 到 KTable Join。使用 KTable 作为查找表。以下步骤显示了执行代码的顺序

  1. 构造 KTable

  2. ReKey KTable

  3. 构造 KStream

  4. ReKey KStream

  5. 加入 KStream - KTable

假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,假设 KStreams 中的每个键在 KTable 中都有一条记录。所以预期的输出将是 8000 条记录。

每次我第一次加入或启动应用程序时。预期输出为 8000 条记录,但有时我只看到 6200 条记录,有时 8000 条完整的记录集(两次),有时没有记录,等等。

  • 问题1:为什么每次运行应用程序时记录都不一致?

    在 KTable 被构造(construct + Rekey)之前,KStreams 被构造并且数据可用于从 KStream 端连接,然后连接从 KTable 开始,因此在 KTable 构造之前不会在最终连接中看到数据。一旦构建了 KTable,我们就可以看到剩余记录的连接发生了。

  • 问题2:如何解决记录中加入不一致的问题?

    我尝试使用嵌入式 Kafka 进行 KStream 和 Ktable 连接的测试用例。有 10 条来自 KStreams 的记录和 3 条来自 KTable 的记录用于进程。当我第一次运行测试用例时,没有加入,加入后我没有看到任何数据。当第二次运行时,它运行完美。如果我清除状态存储然后回到零。

  • 问题3:为什么会发生这种行为?

    我尝试使用 KSQL,连接运行良好,我得到了 8000 条记录,然后我进入 KSQL 源代码,我注意到 KSQL 也在执行相同的连接功能。

  • 问题 4:KSQL 是如何解决这个问题的?

我看到几个示例建议的答案

我正在使用 spring 云流作为依赖项。

我还看到在 JIRA 的某个地方有一个未解决的问题。

4

1 回答 1

1

以下步骤显示了执行代码的顺序

请注意,构建拓扑只是提供数据流程序的逻辑描述,并没有不同运算符的“执行顺序”。程序将被翻译,所有操作员将同时执行。因此,来自所有主题的数据将被并行读取。

这种并行处理是您观察到的根本原因,即在处理开始之前没有首先加载表(至少默认情况下不能保证),因此即使表没有完全加载,也可以处理流端数据。

不同主题之间的处理顺序取决于记录的时间戳:时间戳较小的记录首先处理。因此,如果要确保 KTable 数据首先被处理,则必须确保记录时间戳小于流侧记录时间戳。这可以在您将输入数据生成到输入主题时或通过使用自定义时间戳提取器来确保。

其次,从主题中获取数据是不确定的,因此,如果仅返回流端的数据(而不是表端数据),则无法进行时间戳比较,因此流端数据将在表端数据之前处理. 要解决此问题,您可以增加配置参数max.task.idle.ms(默认为0ms)。如果增加此配置(我相信这也是 KSQL 默认情况下所做的),如果一个输入没有数据,则任务将阻塞并尝试为空输入获取数据(只有在空闲时间过去后,处理才会继续即使一侧是空的)。

对于一个GlobalKTable行为是不同的。该表将在任何处理开始之前在启动时加载。因此,我不确定为什么这对您不起作用。

于 2020-03-10T05:06:22.747 回答