3

我很难理解 Kafka Streams 中的 Windowing 是如何工作的。结果似乎与我迄今为止阅读和理解的内容不一致。

我创建了一个带有支持主题的 KSQL 流。KSQL SELECT 语句中的“列”之一已被指定为主题的 TIMESTAMP。

CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;

my-stream-topic 中的记录按键 (PARTITION_KEY) 分组,并使用跳跃窗口进行窗口化

val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed) 
    .groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
    .windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))

记录通过以下方式汇总

val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
    .aggregate(
      new Initializer[TopicStats] {<code omitted>}},
      new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
      Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
        .withValueSerde(new JSONSerde[TopicStats])
    )

  val topicStats: KStream[String, TopicValueStats] = dataAgg
    .toStream()
    .map( <code omitted for brevity>)

然后我通过打印到控制台

dataAgg.print()
topicStats.print()

组中的第一个窗口转换为 7:00 - 7:05

当我通过控制台消费者检查 my-stream-topic 中的记录时,我看到有 2 条记录应该落在上述窗口中。但是,只有其中 1 个被聚合器拾取。

我认为 dataAgg 窗口化 KTable 将包含 1 条记录作为分组键,但聚合将使用 2 条记录来计算聚合。打印的合计值不正确。

我错过了什么?

4

1 回答 1

2

KSQL 可以在写入时设置记录时间戳,但是您需要在创建输入流时指定时间戳,而不是在定义输出流时。即,为输入流指定的时间戳将用于在写入时设置记录元数据字段。

这种行为相当不直观,我为此问题开了一张票:https ://github.com/confluentinc/ksql/issues/1367

因此,您需要with(TIMESTAMP='recorded_timestamp')在为问题中显示的查询创建输入流时指定子句。如果这是不可能的,因为您的查询需要对不同的时间戳进行操作,您需要指定第二个查询,将数据复制到新主题中。

CREATE STREAM my_stream_with_ts
    WITH (KAFKA_topic='my-stream-topic-with-ts')
AS select * from my_stream PARTITION BY PARTITION_KEY;

作为替代方案,您可以为 Kafka Streams 应用程序设置自定义时间戳提取器,以从有效负载中提取时间戳。

于 2018-06-01T18:51:01.297 回答