我想做一个 KStream 到 KTable Join。使用 KTable 作为查找表。以下步骤显示了执行代码的顺序
构造 KTable
ReKey KTable
构造 KStream
ReKey KStream
加入 KStream - KTable
假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,假设 KStreams 中的每个键在 KTable 中都有一条记录。所以预期的输出将是 8000 条记录。
每次我第一次加入或启动应用程序时。预期输出为 8000 条记录,但有时我只看到 6200 条记录,有时 8000 条完整的记录集(两次),有时没有记录,等等。
问题1:为什么每次运行应用程序时记录都不一致?
在 KTable 被构造(construct + Rekey)之前,KStreams 被构造并且数据可用于从 KStream 端连接,然后连接从 KTable 开始,因此在 KTable 构造之前不会在最终连接中看到数据。一旦构建了 KTable,我们就可以看到剩余记录的连接发生了。
问题2:如何解决记录中加入不一致的问题?
我尝试使用嵌入式 Kafka 进行 KStream 和 Ktable 连接的测试用例。有 10 条来自 KStreams 的记录和 3 条来自 KTable 的记录用于进程。当我第一次运行测试用例时,没有加入,加入后我没有看到任何数据。当第二次运行时,它运行完美。如果我清除状态存储然后回到零。
问题3:为什么会发生这种行为?
我尝试使用 KSQL,连接运行良好,我得到了 8000 条记录,然后我进入 KSQL 源代码,我注意到 KSQL 也在执行相同的连接功能。
问题 4:KSQL 是如何解决这个问题的?
我看到几个示例建议的答案
- 使用不起作用的GlobalKTable。我得到了同样不一致的加入。
- 使用自定义连接器https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java 这不起作用
我正在使用 spring 云流作为依赖项。
我还看到在 JIRA 的某个地方有一个未解决的问题。