Kafka Streams 提供 2 种语义:emit-on-update 和 emit-on-window-close。
KIP-557是关于基于数据的字节数组比较添加变化时发出的语义。它已在 Kafka Streams 2.6 中实现,然后由于“潜在的数据丢失”而被删除。
尽管如此,我还是通过使用 Kafka Streams DSL 开发了一个变化时发出语义的实现。
这个想法是将具有更新时发出语义的 KStream 转换为具有更改时发出语义的 KStream。您可以在您提供的源 Kstream 上使用此实现来创建 KTable,也可以在应用后在 KTable 上使用此实现.toStream()
。
这个实现隐式地创建了一个状态存储,其中的值包含 KStream 数据和一个标志,指示是否应该发出更新。此标志在聚合操作中设置,并基于Object#equals
用于比较。但是您可以更改实现以使用Comparator
.
这是withEmitOnChange
改变 KStream 语义的方法。您可能必须为EmitOnChangeState
数据结构指定一个 serde(见下文)。
public static <K, V> KStream<K, V> withEmitOnChange(KStream<K, V> streams) {
return streams
.groupByKey()
.aggregate(
() -> (EmitOnChangeState<V>) null,
(k, data, state) -> {
if (state == null) {
return new EmitOnChangeState<>(data, true);
} else {
return state.merge(data);
}
}
)
.toStream()
.filter((k, state) -> state.shouldEmit)
.mapValues(state -> (V) state.data);
}
这是存储在状态存储中并用于检查是否应该发出更新的数据结构。
public static class EmitOnChangeState<T> {
public final T data;
public final boolean shouldEmit;
public EmitOnChangeState(T data, boolean shouldEmit) {
this.data = data;
this.shouldEmit = shouldEmit;
}
public EmitOnChangeState<T> merge(T newData) {
return new EmitOnChangeState<>(newData, Objects.equals(data, newData));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EmitOnChangeState<?> that = (EmitOnChangeState<?>) o;
return shouldEmit == that.shouldEmit && Objects.equals(data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(data, shouldEmit);
}
}
用法:
KStream<ProductKey, Product> products = builder.stream("product-topic");
withEmitOnChange(products)
.to("out-product-topic"); // output topic with emit-on-change semantic