2

我有一个流拓扑,它从一个主题中消费并运行一个聚合并构建一个 KTable,该 KTable 被具体化到 RocksDB 中。

我有另一个应用程序,它每天使用来自同一主题的所有事件,并为满足某些特定条件(即不再需要它们)的事件发送墓碑消息。聚合处理此问题并从状态存储中删除,但我正在查看监控状态存储的大小或更改日志主题 - 任何真正告诉我 ktable 大小的东西。

我已经公开了 JMX 指标,但那里似乎没有任何东西可以满足我的需求。我可以看到“放入”rocksDB 的总数,但看不到键的总数。我的应用程序是 spring boot,我想通过 prometheus 公开指标。

有没有人解决了这个问题或有任何帮助的想法?

4

1 回答 1

2

您可以通过使用它访问 KTable 的底层状态存储来获取每个分区中的大致计数,KeyValueStore#approximateNumEntries()然后将此计数导出到 prometheus(每个分区都有一个计数)。

要访问底层状态存储,您可以使用低级处理器 APIKeyValueStore通过每个 StreamTask 中的每个 ProcessorContext (对应于一个分区)来访问一个。只需添加一个KStream#transformValues()到您的拓扑:

kStream
        ...
        .transformValues(ExtractCountTransformer::new, "your_ktable_name")
        ...

并在 ExtractCountTransformer 中提取计数到普罗米修斯:

@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {

    private KeyValueStore<String, String> yourKTableKvStore;
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
    }

    @Override
    public String transform(String readOnlyKey, String value) {
        //extract count to prometheus
        log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
        yourKTableKvStore.approximateNumEntries();
        return value;
    }

    @Override
    public void close() {

    }
}
于 2020-04-01T09:27:02.250 回答