0

我有一个典型的 samza 任务,它消耗 2 个主题:dataconfig,并将来自的消息config作为本地状态存储在 RocksDB 中,以检查来自的消息data是否正常。

如果这两个主题中的每一个都只有一个分区,则此任务可以正常工作。一旦我分成data十个分区并config保持一个分区,事情就发生了变化。默认情况下,samza 会创建 10 个任务来消费 topic 的 partition 0 ~ 9,data并且只有 task 0 消费configtopic:

task[0] -> config, data[0] task[1] -> data[1] ... task[9] -> data[9]

似乎每个任务都使用自己的rocksdb实例进行初始化,因此只有task[0]将所有配置数据存储在其rocksdb实例中,task[1~9]没有配置数据,因此无法找到传入数据的配置信息。

我期望的是每个任务都使用来自其数据分区和配置流的消息,如下所示:

task[0] -> config, data[0] task[1] -> config, data[1] ... task[9] -> config, data[9]

有什么办法可以做到这一点?

4

1 回答 1

4

输入流分区的分布由使用“job.systemstreampartition.grouper.factor”配置的可插入分组器控制。默认情况下,此类跨任务实例对传入流分区进行分组。默认情况下,我相信它会使用 GroupByPartitionId。这就是您在任务 [0] 中看到数据 [0] 和配置 [0] 的原因。

您可以实现自定义 SSPGrouper。但是,您正在寻找的是将“数据”流视为常规输入流,将“配置”流视为“广播”输入流。广播意味着 Samza 作业中的每个任务都从该流的分区中读取。这样,每个任务实例都可以使用配置流的数据填充其本地 RocksDB。您可以将广播流配置为: task.broadcast.inputs=<systemName>.<streamName>#[<partition-range>], <systemName>.<streamName>#[<partition-range>]

对于您的情况,您可以配置: task.inputs = <systemName>.data task.broadcast.inputs = <systemName>.config#0

查看Samza 中的广播流

于 2017-03-24T06:53:45.960 回答