1

我正在使用 Kafka Streams 版本 0.10.0.1,并试图在流中找到最小值。

传入的消息来自一个名为 kafka-streams-topic 的主题,并且有一个键,值是一个 JSON 有效负载,如下所示:

{"value":2334}

这是一个简单的有效负载,但我想找到这个 JSON 的最小值。

传出消息只是一个数字:2334

并且密钥也是消息的一部分。

因此,如果传入的主题得到:

key=1, value={"value":1000}

名为 min-topic 的传出主题将得到

key=1,value=1000

另一条消息传来:

key=1, value={"value":100}

因为这是同一个键,所以我现在想生成一条带有 key=1 value=100 的消息,因为它现在小于第一条消息

现在让我们说我们得到了:

key=2 value=99

将产生一条新消息,其中:

key=2 and value=99 but the key=1 and associated value shouldn't change.

此外,如果我们收到消息:

key=1 value=2000

不应产生任何消息,因为此消息大于当前值 100

这可行,但我想知道这是否符合 API 的意图:

public class MinProcessor implements Processor<String,String> {

    private ProcessorContext context;
    private KeyValueStore<String, Long> kvStore;
    private Gson gson = new Gson();

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.schedule(1000);
        kvStore = (KeyValueStore) context.getStateStore("Counts");
    }

    @Override
    public void process(String key, String value) {
        Long incomingPotentialMin = ((Double)gson.fromJson(value, Map.class).get("value")).longValue();
        Long minForKey = kvStore.get(key);
        System.out.printf("key: %s incomingPotentialMin: %s minForKey: %s \n", key, incomingPotentialMin, minForKey);

        if (minForKey == null || incomingPotentialMin < minForKey) {
            kvStore.put(key, incomingPotentialMin);
            context.forward(key, incomingPotentialMin.toString());
            context.commit();
        }
    }

    @Override
    public void punctuate(long timestamp) {}

    @Override
    public void close() {
        kvStore.close();
    }
}

这是实际运行处理器的代码:

public class MinLauncher {

    public static void main(String[] args) {    
        TopologyBuilder builder = new TopologyBuilder();

        StateStoreSupplier countStore = Stores.create("Counts")
                .withKeys(Serdes.String())
                .withValues(Serdes.Long())
                .persistent()
                .build();

        builder.addSource("source", "kafka-streams-topic")
                .addProcessor("process", () -> new MinProcessor(), "source")
                .addStateStore(countStore, "process")
                .addSink("sink", "min-topic", "process");

        KafkaStreams streams = new KafkaStreams(builder, KafkaStreamsProperties.properties("kafka-streams-min-poc"));
        streams.cleanUp();
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
4

1 回答 1

2

不确定您的确切输入数据和结果是什么(也许您可以用以下信息更新您的问题:您的输入记录是什么?您的输出是什么?产生了什么“额外消息 [] [] [] [您] 不期望“?)。

但是,一些一般性说明(如果需要,可以稍后完善此答案)。

  • 您基于键进行计算,因此您应该期望每个键都有一个结果(不确定您的输入中是否有多个不同的键)。
  • 您发出punctuate()定期调用的数据(以内部跟踪的流时间为基础——即,基于从您的输入记录中提取的时间戳值TimestampExtractor)。因此,您将在被调用时写入写入主题的每个键的当前最小值punctuate(),因此,您可以对每个键进行多个更新,这些更新都附加到您的结果主题。(主题只是附加的,如果你用相同的键写两条消息,你会看到两者——没有覆盖。)
于 2016-12-02T21:37:20.760 回答