1

在我的 Kafka Streams 应用程序中,我有一个任务来设置一个预定的(按墙上时间)标点符号。标点符号遍历商店的条目并对其进行处理。像这样:

var store = context().getStateStore("MyStore");
var iter = store.all();

while (iter.hasNext()) {
   var entry = iter.next();
   // ... do something with the entry
}

// Print a summary (now): N entries processed
// Print a summary (wish): N entries processed in partition P

由于我在这里使用单个存储(可能已分区),因此我假设标点符号的每次执行都绑定到该存储的单个分区。

是否可以找出标点符号在哪个分区上运行?java docs forProcessorContext.partition()声明此方法-1在标点符号中返回。

我已经阅读了Kafka Streams: Punctuate vs Process以及那里的答案。我可以理解,一般来说,任务不依赖于特定的分区。但是迭代器应该与 IMO 绑定。

我怎样才能找到分区?

还是我假设存储迭代器的特定实例与分区相关联是错误的?

我需要它做什么:我想在一些日志消息中包含分区号。现在,我有几条几乎相同的日志消息,说明标点符号做了这个和那个。为了使这些消息“唯一”,我想将分区号包含在其中。

4

1 回答 1

0

只是在这里发布https://issues.apache.org/jira/browse/KAFKA-12328中提供的答案:

我刚用过context.taskId()。它在值的末尾包含下划线之后的分区号。这对我来说已经足够了。

于 2021-02-18T17:14:29.510 回答