3

因此,实际上,我已经为此苦苦挣扎了几天。我正在使用来自 4 个主题的记录。我需要通过 TimedWindow 聚合记录。时间到了,我想向接收器主题发送批准的消息或未批准的消息。这可能与卡夫卡流有关吗?

似乎它会将每条记录都归入新主题,即使窗口仍然打开,这真的不是我想要的。

这是简单的代码:

 builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(), 
 Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String, 
 FooTriggerMessage>("", Serdes.String(),
       fooTriggerSerde))
 .filter((key, value) -> value.getTriggerEventId() != null)
 .groupBy((key, value) -> value.getTriggerEventId().toString(),
       Serialized.with(Serdes.String(), fooTriggerSerde))

.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))

.aggregate(() -> new BarApprovalMessage(), /* initializer */
       (key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
       Materialized
               .<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
                       storeName) /* state store name */
               .withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(), 
Produced.with(windowedSerde, barApprovalSerde));

截至目前,每条记录都被下沉到传出主题,我只希望它在窗口关闭时发送一条消息,可以这么说。

这可能吗?

4

3 回答 3

5

我回答我自己的问题,如果其他人需要答案。在转换阶段,我使用上下文来创建调度程序。该调度程序采用三个参数。标点的间隔,使用的时间(挂钟或流时间)和供应商(满足时间时调用的方法)。我使用挂钟时间并为每个唯一的窗口键启动了一个新的调度程序。我在 KeyValue 存储中添加每条消息并返回 null。然后,在每 30 秒调用一次的方法中,我检查窗口是否已关闭,并遍历密钥库中的消息,聚合并使用 context.forward 和 context.commit。中提琴!在 30 秒的窗口中收到 4 条消息,产生了 1 条消息。

于 2018-03-04T21:09:49.527 回答
2

您可以使用抑制功能。

来自 Kafka 官方指南:

在此处输入图像描述 https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results

于 2019-08-14T12:45:06.663 回答
0

我遇到了这个问题,但是我解决了这个问题,在固定窗口之后添加了 grace(0) 并使用了 Suppressed API

public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) {

        buildAggregateMetricsBySensor(stream)
                .to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));

    }

private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) {
        return stream
                .map((key, val) -> new KeyValue<>(val.getId(), val))
                .groupByKey(Grouped.with(String(), new SensorDataSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
                .aggregate(SensorAggregateMetricsDTO::new,
                        (String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
                        buildWindowPersistentStore())
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .map((key, value) -> KeyValue.pair(key.key(), value));
    }


    private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() {
        return Materialized
                .<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
                .withKeySerde(String())
                .withValueSerde(new SensorAggregateMetricsSerde());
    }

在这里你可以看到结果

在此处输入图像描述

于 2020-09-11T17:31:54.820 回答