1

试图实现一个 Flink 作业来读取 Kafka 流和聚合会话,由于某种原因 getResult() 没有被调用。我看到 createAccumulator() 和 add() 被调用,我期待 getResult() 也被调用,以便我可以在目的地接收聚合消息。

        source.keyBy(new KeySelector<GenericRecord, String>() {
                    @Override
                    public String getKey(GenericRecord record) {
                        return record.get("id").toString();
                    }})
                .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<GenericRecord>() {
                    private static final long serialVersionUID = -4834111073247835189L;
                    private final long maxTimeLag = 300000L;

                    @Nullable
                    @Override
                    public Watermark checkAndGetNextWatermark(GenericRecord lastElement, long extractedTimestamp) {
                        return new Watermark(extractedTimestamp - maxTimeLag);
                    }

                    @Override
                    public long extractTimestamp(GenericRecord element, long previousElementTimestamp) {
                        long ts = 1000 * (long)element.get(("timestamp"));
                        return (ts);
                    }
                })
                .map(new ReduceAttributesMap())
                .keyBy(new KeySelector<Tuple2<String, String>, String>() {
                    @Override
                    public String getKey(Tuple2<String, String> e) {
                        return e.f0;
                    }
                })
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .aggregate(new EventAggregation())
                .addSink(...)

可能是什么问题?我配置错误了吗?感谢你的帮助!

4

1 回答 1

1

AggregateFunction#getResult()仅在窗口完成时调用。在您的情况下,仅在 5 分钟后没有特定键的事件时才会发出窗口。你能在你的数据中确认这种情况确实发生了吗?

您可以尝试减少会话窗口的间隔时间以更轻松地查看它。此外,您的水印分配器看起来很可疑。您可能想使用BoundedOutOfOrdernessTimestampExtractor. 最后,您能否再次检查您的时间提取是否按预期工作?时间戳是否存储为自 1970 年以来的秒数?

于 2019-11-28T10:47:30.213 回答