5

我们正在接收来自 no 的事件。独立数据源,因此到达我们的 Flink 拓扑(通过 Kafka)的数据将是无序的。

我们在 Flink 拓扑中创建 1 分钟的事件时间窗口,并在源操作符处生成事件时间水印(当前事件时间 - 某个阈值(30 秒))。

如果一些事件在设置的阈值之后到达,这些事件将被简单地忽略(在我们的例子中是可以的,因为属于该分钟的大多数事件已经到达并在相应的窗口中得到处理)。

现在,问题是如果程序崩溃(无论出于何种原因)然后从最后一个成功的检查点再次恢复,无序到达的事件将触发过去(已经处理的)窗口的执行(只有极少数事件在该窗口)覆盖上一个结果。该窗口的计算。

如果 Flink 有检查点事件时间水印,则不会发生此问题。

所以,我想知道是否有办法在 Flink 中强制执行事件时间水印的检查点......

4

2 回答 2

2

虽然这是一个老问题,但我遇到了同样的问题。应用程序正在重新启动,并且带有事件时间窗口的连接函数不再触发,因为来自其中一个流的事件在崩溃之前完成。加入可以恢复状态,但由于其中一个流不再有水印,因此事件在重新启动后永远不会加入。

我找到的解决方案是在源操作符之后为最新的水印创建一个检查点。由于没有 UDF 来保存水印的快照,因此我必须创建自己的运算符,该运算符不会更改事件(身份函数)并将最新的水印保存为其状态。当 Flink 从崩溃中恢复时,WatermarkStreamOperator.initializeState()会发出最后一个水印检查点ListState<Long> latestWatermark在线processWatermark(new Watermark(maxWatermark))。然后可以触发与事件时间窗口的加入。

public class WatermarkStreamOperator<IN> extends AbstractUdfStreamOperator<IN, WatermarkFunction<IN>>
        implements OneInputStreamOperator<IN, IN> {
    private static final long serialVersionUID = 1L;
    private ListState<Long> latestWatermark;
    public WatermarkStreamOperator(WatermarkFunction<IN> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }
    @Override
    public void initializeState(StateInitializationContext context) throws Exception { System.out.println("WatermarkStreamOperator.initializeState");
        super.initializeState(context);
        ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("latest-watermark", Long.class);
        latestWatermark = context.getOperatorStateStore().getListState(descriptor);
        List<Long> watermarkList = new ArrayList<>();
        latestWatermark.get().forEach(watermarkList::add);
        Long maxWatermark = watermarkList.stream().max(Long::compare).orElse(0L);
        if (!maxWatermark.equals(Long.valueOf(0l))) {
            System.out.println("watermarkList recovered max: " + maxWatermark);
            processWatermark(new Watermark(maxWatermark));
        }
    }
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element);
    }
    @Override
    public void processWatermark(Watermark mark) throws Exception {
        System.out.println("processing watermark: " + mark.getTimestamp()); latestWatermark.update(Arrays.asList(mark.getTimestamp()));
        super.processWatermark(mark);
    }
}

以及操作员的身份 UDF:

public interface WatermarkFunction<T> extends Function, Serializable {
    T process(T value) throws Exception;
}

最后我用.transform()来调用我WatermarkStreamOperator的 with MyTupleWatermarkFunc

DataStream<Tuple2<String, Integer>> dataStream = env
                .addSource(new MySource(sentence))
                .transform("myStatefulWatermarkOperator",
                        TypeInformation.of(String.class),
                        new WatermarkStreamOperator<>(new MyTupleWatermarkFunc()))
                ...
                ...
    public class MyTupleWatermarkFunc implements WatermarkFunction<String> {
        private static final long serialVersionUID = 1L;
        @Override
        public String process(String value) throws Exception {
            return value;
        }
    }

这是我为此创建的单元和集成测试https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/ org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java

于 2021-06-25T21:08:39.953 回答
1

我认为最简单的解决方案是ProcessFunction在窗口运算符之后注入一个。

可以通过其对象ProcessFunction访问当前水印,Context并可以将其存储在联合运算符状态。如果发生故障,ProcessFunction则从其状态中恢复水印并过滤所有时间戳小于水印的记录(时间戳也可以通过Context对象访问)。

于 2018-03-02T08:55:04.803 回答