我有一个混合匹配的 Scala 拓扑结构,其中主要工作人员是 PAPI 处理器,其他部分通过 DSL 连接。
EventsProcessor:
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)
整个主题的数据(包括 original eventsTopic
)通过 a 进行分区,我们称它为DoubleKey
具有两个字段。visitorsTopic
访问者通过 Sink发送到:
.addSink(VISITOR_SINK_NAME, visitorTopicName,
DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)
在 DSL 中,我在这个主题上创建了一个 KV KTable:
val visitorTable = builder.table(
visitorTopicName,
Consumed.`with`(DoubleKey.getKafkaSerde(),
Visitor.getKafkaSerde()),
Materialized.as(visitorStoreName))
我后来连接到EventProcessor
:
topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)
一切都是共同分区的(通过 DoubleKey)。visitorSinkPartitioner
执行典型的模运算:
Math.abs(partitionKey.hashCode % numPartitions)
在 PAPI 处理器 EventsProcessor 中,我查询此表以查看是否已经存在访问者。
但是,在我的测试中(使用EmbeddedKafka,但这应该没有什么区别),如果我用一个分区运行它们,一切都很好(EventsProcessor 检查 KTable 上的两个事件DoubleKey
和第二个事件 - 有一些延迟 -它可以看到Visitor
商店中的存在),但如果我用更高的数字运行它,EventProcessor 永远不会看到商店中的值。
但是,如果我通过 API(迭代)检查商店store.all()
,记录就在那里。所以我知道它必须去不同的分区。
由于 KTable 应该处理其分区上的数据,并且所有内容都发送到同一个分区(使用显式分区程序调用相同的代码),因此 KTable 应该在同一个分区上获取该数据。
我的假设正确吗?会发生什么?
卡夫卡流 1.0.0,斯卡拉 2.12.4。
PS。当然,它可以在 PAPI 上执行put
s 通过 PAPI 而不是创建存储StreamsBuilder.table()
,因为这肯定会使用代码运行的相同分区,但这是不可能的。