我很难理解 Kafka Streams 中的 Windowing 是如何工作的。结果似乎与我迄今为止阅读和理解的内容不一致。
我创建了一个带有支持主题的 KSQL 流。KSQL SELECT 语句中的“列”之一已被指定为主题的 TIMESTAMP。
CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;
my-stream-topic 中的记录按键 (PARTITION_KEY) 分组,并使用跳跃窗口进行窗口化
val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed)
.groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
.windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))
记录通过以下方式汇总
val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
.aggregate(
new Initializer[TopicStats] {<code omitted>}},
new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
.withValueSerde(new JSONSerde[TopicStats])
)
val topicStats: KStream[String, TopicValueStats] = dataAgg
.toStream()
.map( <code omitted for brevity>)
然后我通过打印到控制台
dataAgg.print()
topicStats.print()
组中的第一个窗口转换为 7:00 - 7:05
当我通过控制台消费者检查 my-stream-topic 中的记录时,我看到有 2 条记录应该落在上述窗口中。但是,只有其中 1 个被聚合器拾取。
我认为 dataAgg 窗口化 KTable 将包含 1 条记录作为分组键,但聚合将使用 2 条记录来计算聚合。打印的合计值不正确。
我错过了什么?