您可以通过使用它访问 KTable 的底层状态存储来获取每个分区中的大致计数,KeyValueStore#approximateNumEntries()
然后将此计数导出到 prometheus(每个分区都有一个计数)。
要访问底层状态存储,您可以使用低级处理器 APIKeyValueStore
通过每个 StreamTask 中的每个 ProcessorContext (对应于一个分区)来访问一个。只需添加一个KStream#transformValues()
到您的拓扑:
kStream
...
.transformValues(ExtractCountTransformer::new, "your_ktable_name")
...
并在 ExtractCountTransformer 中提取计数到普罗米修斯:
@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {
private KeyValueStore<String, String> yourKTableKvStore;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
}
@Override
public String transform(String readOnlyKey, String value) {
//extract count to prometheus
log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
yourKTableKvStore.approximateNumEntries();
return value;
}
@Override
public void close() {
}
}