2

Kafka Ktable 还流式传输重复更新。

我想处理 Ktable(使用 Kstream.reduce() 创建)更改日志流,即 Ktable 中键值的任何更改。但是,即使将相同的键值对多次发送到 Ktable,它似乎也每次都向下游发送。仅当值更改时,我才需要在键的值中发送更新。

`

groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde())) 
                .reduce(new Reducer<Long>() {   
                    @Override
                    public Long apply(Long t1, Long t2) {
                        return t2;
                    }
                }).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
        {

            sendUpdate(key); 
        });

`

4

2 回答 2

2

这是 的默认行为KTable#toStream(),它将 changelog 主题转换为 a KStream,因此reduce每次上游 reduce 操作员收到消息时,下游操作员都会更新。

您可以使用Processor API归档您想要的行为,在这种情况下,我们使用 KStream.transfomerValues()。

首先注册一个 KeyValueStore 来存储你的最新值:

//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
        .addStateStore(Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));

numberKStream
        .transformValues(ExtractIfValueChangedTransformer::new, "number_store")
        .filter((key, value) -> value != null)
        .foreach((key, value) -> sendUpdate(key));

然后我们创建一个ExtractIfValueChangedTransformer,如果值已经改变,则只返回新消息的值,如果没有,则返回 null:

public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {

    KeyValueStore<Long, Long> kvStore;

    @Override
    public void init(ProcessorContext context) {
        kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
    }

    @Override
    public Long transform(Long key, Long newValue) {
        Long oldValue = kvStore.get(key);
        kvStore.put(key, newValue);
        if (oldValue == null) return newValue;
        return oldValue.equals(newValue) ? null : newValue;
    }

    @Override
    public void close() {}
}
于 2020-04-21T16:23:26.083 回答
1

Kafka Streams 提供 2 种语义:emit-on-update 和 emit-on-window-close。

KIP-557是关于基于数据的字节数组比较添加变化时发出的语义。它已在 Kafka Streams 2.6 中实现,然后由于“潜在的数据丢失”而被删除

尽管如此,我还是通过使用 Kafka Streams DSL 开发了一个变化时发出语义的实现。

这个想法是将具有更新时发出语义的 KStream 转换为具有更改时发出语义的 KStream。您可以在您提供的源 Kstream 上使用此实现来创建 KTable,也可以在应用后在 KTable 上使用此实现.toStream()

这个实现隐式地创建了一个状态存储,其中的值包含 KStream 数据和一个标志,指示是否应该发出更新。此标志在聚合操作中设置,并基于Object#equals用于比较。但是您可以更改实现以使用Comparator.

这是withEmitOnChange改变 KStream 语义的方法。您可能必须为EmitOnChangeState数据结构指定一个 serde(见下文)。

public static <K, V> KStream<K, V> withEmitOnChange(KStream<K, V> streams) {
    return streams
            .groupByKey()
            .aggregate(
                    () -> (EmitOnChangeState<V>) null,
                    (k, data, state) -> {
                        if (state == null) {
                            return new EmitOnChangeState<>(data, true);
                        } else {
                            return state.merge(data);
                        }
                    }
            )
            .toStream()
            .filter((k, state) -> state.shouldEmit)
            .mapValues(state -> (V) state.data);
}

这是存储在状态存储中并用于检查是否应该发出更新的数据结构。

public static class EmitOnChangeState<T> {
    public final T data;
    public final boolean shouldEmit;
    public EmitOnChangeState(T data, boolean shouldEmit) {
        this.data = data;
        this.shouldEmit = shouldEmit;
    }
    public EmitOnChangeState<T> merge(T newData) {
        return new EmitOnChangeState<>(newData, Objects.equals(data, newData));
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        EmitOnChangeState<?> that = (EmitOnChangeState<?>) o;
        return shouldEmit == that.shouldEmit && Objects.equals(data, that.data);
    }
    @Override
    public int hashCode() {
        return Objects.hash(data, shouldEmit);
    }
}

用法:

KStream<ProductKey, Product> products = builder.stream("product-topic");

withEmitOnChange(products)
  .to("out-product-topic"); // output topic with emit-on-change semantic
于 2021-06-11T15:02:40.000 回答