仅一次运行 3 个 Kafka Streams 实例,但在重新启动其中一个流实例时遇到数据丢失(另外 2 个正在重新平衡)。如果我快速重新启动实例(在 内session.timeout.ms
),而其他 2 没有重新平衡,一切都按预期工作。
- 输入和输出主题由 6 个分区创建。
- 运行 3 个 Kafka 代理。
- 在循环中使用单个 python 生产者生成数据 (
acks='all'
)。 - 使用配置的单个 Kafka Connect 将数据输出到 SQL
consumer.override.isolation.level=read_committed
我期望聚合数据与我的 python 循环的输出具有相同的计数。只要 Kafka Streams 不进入重新平衡状态,这就可以正常工作。
简而言之,流实例会:
- 收集会话数据,并更新会话状态。
- 然后使用窗口聚合对会话状态的增量更新进行重新分区和求和。
通过我自己的调试输出,我倾向于认为问题与转移聚合状态有关:
- 作为会话 X 的更新的记录 A 将 0 添加到聚合中。
- 聚合的输出现在是 6
- 作为对会话 X 的更新的记录 B 将 1 添加到聚合中。
- 聚合的输出现在是 7
- 再平衡
- 对会话 X(可能是也可能不是重播或记录 A)的更新正在将 0 添加到聚合中。
- 聚合的输出现在是 6
代码的简化和剥离版本:(不是真正的 Java 开发人员,对非最佳语法感到抱歉)
public static void main(String[] args) throws Exception {
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final StoreBuilder<KeyValueStore<MediaKey, SessionState>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(SESSION_STATE_STORE),
mediaKeySerde,
sessionStateSerde
);
builder.addStateStore(storeBuilder);
KStream<String, IncomingData> incomingData = builder.stream(
SESSION_TOPIC, Consumed.with(Serdes.String(), mediaDataSerde));
KGroupedStream<MediaKey, AggregatedData> mediaData = incomingData
.transform(new SessionProcessingSupplier(SESSION_STATE_STORE), SESSION_STATE_STORE)
.selectKey(...)
.groupByKey(...);
KTable<Windowed<MediaKey>, AggregatedData> aggregatedMedia = mediaData
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
new Initializer<AggregatedData>() {...},
new Aggregator<MediaKey, AggregatedData, AggregatedData>() {
@Override
public AggregatedData apply(MediaKey key, AggregatedData input, AggregatedData aggregated) {
// ... Add stuff to "aggregated"
return aggregated
}
},
Materialized.<MediaKey, AggregatedData, WindowStore<Bytes, byte[]>>as("aggregated-media")
.withValueSerde(aggregatedDataSerde)
);
aggregatedMedia.toStream()
.map(new KeyValueMapper<Windowed<MediaKey>, AggregatedData, KeyValue<MediaKey, PostgresOutput>>() {
@Override
public KeyValue<MediaKey, PostgresOutput> apply(Windowed<MediaKey> mediaidKey, AggregatedData data) {
// ... Some re-formatting and then
return new KeyValue<>(mediaidKey.key(), output);
}
})
.to(POSTGRES_TOPIC, Produced.with(mediaKeySerde, postgresSerde));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
// Shutdown hook
}
和:
public class SessionProcessingSupplier implements TransformerSupplier<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>> {
@Override
public Transformer<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>> get() {
return new Transformer<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>>() {
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
this.stateStore = (KeyValueStore<String, Processing.SessionState>) context.getStateStore(sessionStateStoreName);
}
Override
public KeyValue<String, Processing.AggregatedData> transform(String sessionid, Processing.IncomingData data) {
Processing.SessionState state = this.stateStore.get(sessionid);
// ... Update or create session state
return new KeyValue<String, Processing.AggregatedData>(sessionid, output);
}
};
}
}