4

我有 2 个数据流,我希望能够在 1 个月的窗口内加入它们。当我有实时数据时,使用KStreamjoin一切都很有趣且超级简单。我做了这样的事情;

KStream<String, GenericRecord> stream1 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());

KStream<String, GenericRecord> stream2 =
            builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());

long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

    KStream<String, GenericRecord> joinStream = stream1.join(stream2,
            new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
                @Override
                public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
                    final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
                    ....
                    ....
                    ....

                    return jonnedRecord;
                }
            }, JoinWindows.of(joinWindowSizeMs));

当我想进行数据重放时出现问题。假设我想为过去 6 个月的数据重新执行这些连接,因为我正在同时运行所有数据的管道 kafkaStream 将连接所有可连接的数据并且它不考虑时间差(其中它应该只加入过去一个月的数据)。我假设 JoinWindow 时间是我们将数据插入 Kafka 主题的时间,对吗?
以及如何更改和操纵这个时间,以便我可以正确运行我的数据重播,我的意思是重新插入过去 6 个月的数据,每条记录需要一个月的时间窗口并基于该记录加入。

这个问题与How to manage Kafka KStream to Kstream windowed join不重复?,在那里我问我如何才能根据时间窗口加入。这里我说的是数据重放。根据我在加入 Kafka 期间的理解,将数据插入主题的时间作为 JoinWindow 的时间,所以如果你想进行数据重放并重新插入 6 个月前的数据,kafka 将其作为新数据今天插入,并将与一些其他数据加入它,这些数据实际上是今天不应该的。

4

1 回答 1

4

Kafka 的 Streams API 使用返回的时间戳TimestampExtractor来计算连接。默认情况下,这是记录的嵌入元数据时间戳。(参见http://docs.confluent.io/current/streams/concepts.html#time

默认情况下,KafkaProducer将此时间戳设置为写入时的当前系统时间。(作为替代方案,您可以基于每个主题配置代理,以在代理存储记录时用代理的系统时间覆盖生产者提供的记录时间戳——这提供了“摄取时间”语义。)

因此,这本身不是 Kafka Streams 问题。

有多种选择可以解决这个问题:

  1. 如果您的数据已经在某个主题中,您可以简单地重置您的 Streams 应用程序以重新处理旧数据。为此,您可以使用应用程序重置工具 ( bin/kafka-streams-application-reset.sh)。您还需要在 Streams 应用程序中指定auto.offset.reset策略。earliest查看文档——另外,建议阅读博客文章。

这是最好的方法,因为您不需要再次将数据写入主题。

  1. 如果您的数据不在主题中并且您需要写入数据,则可以通过为每条记录提供时间戳来在应用程序级别显式设置记录时间戳:
KafkaProducer producer = new KafkaProducer(...);
producer.send(new ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value));

因此,如果您摄取旧数据,您可以显式设置时间戳,Kafka Streams 将获取它并相应地计算连接。

于 2017-01-24T00:47:22.157 回答