2

我目前有一些使用聚合构建 KTable 的代码:

inputTopic.groupByKey().aggregate(
    Aggregator::new,
    (key, value, aggregate) -> {
        someProcessingDoneHere;
        return aggregate;
    },
    Materialized.with(Serdes.String(), Serdes.String())
);
        

一旦为单个键接收并聚合了给定数量的消息,我想将最新的聚合状态推送到另一个主题,然后删除表中的键。

我显然可以使用一个普通的 Kafka 生产者,并且有类似的东西:

inputTopic.groupByKey().aggregate(
    Aggregator::new,
    (key, value, aggregate) -> {
        someProcessingDoneHere;
        if (count > threshold) {
                producer.send(new ProducerRecord<String,String>("output-topic", 
                    key, aggregate));
                return null;
        }
        return aggregate;
    },
    Materialized.with(Serdes.String(), Serdes.String())
);
        

但我正在寻找一种更“流”的方法。

有什么提示吗?

4

1 回答 1

0

我认为这里最好的解决方案是将聚合返回到流中,然后在将其发送到主题之前过滤所需的值。

inputTopic.groupByKey().aggregate(
    Aggregator::new,
    (key, value, aggregate) -> {
        someProcessingDoneHere;
        return aggregate;
    },
    Materialized.with(Serdes.String(), Serdes.String())
)
.toStream()
.filter((key, value) -> (value.count > threshold)
.to("output-topic");

编辑:我刚刚意识到您想在序列化之前执行此操作。我认为这样做的唯一方法是使用转换器或处理器而不是聚合。

在那里,您可以访问 StateStore 而不是 KTable。它还使您可以访问context.forward(),让您以任何您想要的方式向下游转发消息。

一些伪代码来展示如何使用转换来完成

@Override
public Transformer<String, String, KeyValue<String, String>> get() {
    return new Transformer<String, String, KeyValue<String, String>>() {

        private KeyValueStore<String, String> stateStore;
        private ProcessorContext context;

        @SuppressWarnings("unchecked")
        @Override
        public void init(final ProcessorContext context) {
            this.context = context;
            stateStore = (KeyValueStore<String, String>) context.getStateStore(STATE_STORE_NAME);
        }

        @Override
        public KeyValue<String, String> transform(String key, String value) {
            String prevAggregation = stateStore.get(key);
            //use prevAggregation and value to calculate newAggregation here:
            //...
            if (newAggregation.length() > threshold) {
                context.forward(key, newAggregation);
                stateStore.delete(key);
            } else {
                stateStore.put(key, newAggregation);
            }
            return null; // transform ignore null
        }

        @Override
        public void close() {
            // Note: The store should NOT be closed manually here via `stateStore.close()`!
            // The Kafka Streams API will automatically close stores when necessary.
        }
    };
}
于 2022-02-23T08:55:59.600 回答