0

我有一个混合匹配的 Scala 拓扑结构,其中主要工作人员是 PAPI 处理器,其他部分通过 DSL 连接。

EventsProcessor: 
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)

整个主题的数据(包括 original eventsTopic)通过 a 进行分区,我们称它为DoubleKey具有两个字段。visitorsTopic访问者通过 Sink发送到:

.addSink(VISITOR_SINK_NAME, visitorTopicName,
    DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)

在 DSL 中,我在这个主题上创建了一个 KV KTable:

val visitorTable = builder.table(
  visitorTopicName,
  Consumed.`with`(DoubleKey.getKafkaSerde(),
  Visitor.getKafkaSerde()),
  Materialized.as(visitorStoreName))

我后来连接到EventProcessor

topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)

一切都是共同分区的(通过 DoubleKey)。visitorSinkPartitioner执行典型的模运算:

Math.abs(partitionKey.hashCode % numPartitions)

在 PAPI 处理器 EventsProcessor 中,我查询此表以查看是否已经存在访问者。

但是,在我的测试中(使用EmbeddedKafka,但这应该没有什么区别),如果我用一个分区运行它们,一切都很好(EventsProcessor 检查 KTable 上的两个事件DoubleKey和第二个事件 - 有一些延迟 -它可以看到Visitor商店中的存在),但如果我用更高的数字运行它,EventProcessor 永远不会看到商店中的值。

但是,如果我通过 API(迭代)检查商店store.all(),记录就在那里。所以我知道它必须去不同的分区。

由于 KTable 应该处理其分区上的数据,并且所有内容都发送到同一个分区(使用显式分区程序调用相同的代码),因此 KTable 应该在同一个分区上获取该数据。

我的假设正确吗?会发生什么?

卡夫卡流 1.0.0,斯卡拉 2.12.4。

PS。当然,它可以在 PAPI 上执行puts 通过 PAPI 而不是创建存储StreamsBuilder.table(),因为这肯定会使用代码运行的相同分区,但这是不可能的。

4

1 回答 1

1

是的,假设是正确的。

如果它对任何人有帮助:

将分区器传递给 Scala EmbeddedKafka 库时遇到问题。在其中一个测试套件中,它没有正确完成。现在,随着重构的健康实践,我在这个拓扑的所有套件中都使用了这种方法。

def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) : 
    EmbeddedKafkaConfig = {
    val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
        classOf[DoubleKeyPartitioner].getCanonicalName)
    EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, 
        customProducerProperties = producerProperties)
}
于 2018-07-27T16:07:57.113 回答