我有 2 个数据流,我希望能够在 1 个月的窗口内加入它们。当我有实时数据时,使用KStream和join一切都很有趣且超级简单。我做了这样的事情;
KStream<String, GenericRecord> stream1 =
builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());
KStream<String, GenericRecord> stream2 =
builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());
long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
KStream<String, GenericRecord> joinStream = stream1.join(stream2,
new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
....
....
....
return jonnedRecord;
}
}, JoinWindows.of(joinWindowSizeMs));
当我想进行数据重放时出现问题。假设我想为过去 6 个月的数据重新执行这些连接,因为我正在同时运行所有数据的管道 kafkaStream 将连接所有可连接的数据并且它不考虑时间差(其中它应该只加入过去一个月的数据)。我假设 JoinWindow 时间是我们将数据插入 Kafka 主题的时间,对吗?
以及如何更改和操纵这个时间,以便我可以正确运行我的数据重播,我的意思是重新插入过去 6 个月的数据,每条记录需要一个月的时间窗口并基于该记录加入。
这个问题与How to manage Kafka KStream to Kstream windowed join不重复?,在那里我问我如何才能根据时间窗口加入。这里我说的是数据重放。根据我在加入 Kafka 期间的理解,将数据插入主题的时间作为 JoinWindow 的时间,所以如果你想进行数据重放并重新插入 6 个月前的数据,kafka 将其作为新数据今天插入,并将与一些其他数据加入它,这些数据实际上是今天不应该的。