1

仅一次运行 3 个 Kafka Streams 实例,但在重新启动其中一个流实例时遇到数据丢失(另外 2 个正在重新平衡)。如果我快速重新启动实例(在 内session.timeout.ms),而其他 2 没有重新平衡,一切都按预期工作。

  • 输入和输出主题由 6 个分区创建。
  • 运行 3 个 Kafka 代理。
  • 在循环中使用单个 python 生产者生成数据 ( acks='all')。
  • 使用配置的单个 Kafka Connect 将数据输出到 SQLconsumer.override.isolation.level=read_committed

我期望聚合数据与我的 python 循环的输出具有相同的计数。只要 Kafka Streams 不进入重新平衡状态,这就可以正常工作。

简而言之,流实例会:

  1. 收集会话数据,并更新会话状态。
  2. 然后使用窗口聚合对会话状态的增量更新进行重新分区和求和。

通过我自己的调试输出,我倾向于认为问题与转移聚合状态有关:

  1. 作为会话 X 的更新的记录 A 将 0 添加到聚合中。
  2. 聚合的输出现在是 6
  3. 作为对会话 X 的更新的记录 B 将 1 添加到聚合中。
  4. 聚合的输出现在是 7
  5. 再平衡
  6. 对会话 X(可能是也可能不是重播或记录 A)的更新正在将 0 添加到聚合中。
  7. 聚合的输出现在是 6

代码的简化和剥离版本:(不是真正的 Java 开发人员,对非最佳语法感到抱歉)

public static void main(String[] args) throws Exception {
    props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

    final StoreBuilder<KeyValueStore<MediaKey, SessionState>> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(SESSION_STATE_STORE),
            mediaKeySerde,
            sessionStateSerde
    );
    builder.addStateStore(storeBuilder);

    KStream<String, IncomingData> incomingData = builder.stream(
            SESSION_TOPIC, Consumed.with(Serdes.String(), mediaDataSerde));
    KGroupedStream<MediaKey, AggregatedData> mediaData = incomingData
                .transform(new SessionProcessingSupplier(SESSION_STATE_STORE), SESSION_STATE_STORE)
                .selectKey(...)
                .groupByKey(...);

    KTable<Windowed<MediaKey>, AggregatedData> aggregatedMedia = mediaData
                .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
                .aggregate(
                        new Initializer<AggregatedData>() {...},
                        new Aggregator<MediaKey, AggregatedData, AggregatedData>() {
                            @Override
                            public AggregatedData apply(MediaKey key, AggregatedData input, AggregatedData aggregated) {
                                // ... Add stuff to "aggregated"
                                return aggregated
                            }
                        },
                        Materialized.<MediaKey, AggregatedData, WindowStore<Bytes, byte[]>>as("aggregated-media")
                                .withValueSerde(aggregatedDataSerde)
               );

    aggregatedMedia.toStream()
            .map(new KeyValueMapper<Windowed<MediaKey>, AggregatedData, KeyValue<MediaKey, PostgresOutput>>() {
                @Override
                public KeyValue<MediaKey, PostgresOutput> apply(Windowed<MediaKey> mediaidKey, AggregatedData data) {
                        // ... Some re-formatting and then
                        return new KeyValue<>(mediaidKey.key(), output);
                }
            })
            .to(POSTGRES_TOPIC, Produced.with(mediaKeySerde, postgresSerde));

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

    // Shutdown hook
}

和:

public class SessionProcessingSupplier implements TransformerSupplier<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>> {
    @Override
    public Transformer<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>> get() {
        return new Transformer<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>>() {
            @Override
            public void init(ProcessorContext processorContext) {
                this.context = processorContext;
                this.stateStore = (KeyValueStore<String, Processing.SessionState>) context.getStateStore(sessionStateStoreName);
            }

            Override
            public KeyValue<String, Processing.AggregatedData> transform(String sessionid, Processing.IncomingData data) {
                Processing.SessionState state = this.stateStore.get(sessionid);
                // ... Update or create session state
                return new KeyValue<String, Processing.AggregatedData>(sessionid, output);
            }
        };
    }
}
4

0 回答 0