我正在使用 Kafka Streams 版本 0.10.0.1,并试图在流中找到最小值。
传入的消息来自一个名为 kafka-streams-topic 的主题,并且有一个键,值是一个 JSON 有效负载,如下所示:
{"value":2334}
这是一个简单的有效负载,但我想找到这个 JSON 的最小值。
传出消息只是一个数字:2334
并且密钥也是消息的一部分。
因此,如果传入的主题得到:
key=1, value={"value":1000}
名为 min-topic 的传出主题将得到
key=1,value=1000
另一条消息传来:
key=1, value={"value":100}
因为这是同一个键,所以我现在想生成一条带有 key=1 value=100 的消息,因为它现在小于第一条消息
现在让我们说我们得到了:
key=2 value=99
将产生一条新消息,其中:
key=2 and value=99 but the key=1 and associated value shouldn't change.
此外,如果我们收到消息:
key=1 value=2000
不应产生任何消息,因为此消息大于当前值 100
这可行,但我想知道这是否符合 API 的意图:
public class MinProcessor implements Processor<String,String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
private Gson gson = new Gson();
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
kvStore = (KeyValueStore) context.getStateStore("Counts");
}
@Override
public void process(String key, String value) {
Long incomingPotentialMin = ((Double)gson.fromJson(value, Map.class).get("value")).longValue();
Long minForKey = kvStore.get(key);
System.out.printf("key: %s incomingPotentialMin: %s minForKey: %s \n", key, incomingPotentialMin, minForKey);
if (minForKey == null || incomingPotentialMin < minForKey) {
kvStore.put(key, incomingPotentialMin);
context.forward(key, incomingPotentialMin.toString());
context.commit();
}
}
@Override
public void punctuate(long timestamp) {}
@Override
public void close() {
kvStore.close();
}
}
这是实际运行处理器的代码:
public class MinLauncher {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
builder.addSource("source", "kafka-streams-topic")
.addProcessor("process", () -> new MinProcessor(), "source")
.addStateStore(countStore, "process")
.addSink("sink", "min-topic", "process");
KafkaStreams streams = new KafkaStreams(builder, KafkaStreamsProperties.properties("kafka-streams-min-poc"));
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}