1

我在几个管道中遇到了这个问题,但一直找不到答案。当使用时间戳分配器为单调或越界时间戳分配水印策略的管道时,时间戳被正确提取并正在推进,但水印停留在 -9223372036854775808。我尝试在 pyflink 库中运行 event_time_timer.py 示例作为健全性检查,但经过检查,process_element 和 on_timer 方法都没有移动 -9223372036854775808 的水印。和 9223372036854775807 分别。

这是流程函数和时间戳分配器的代码:

class Sum(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
        state_ttl_config = StateTtlConfig \
            .new_builder(Time.seconds(1)) \
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
            .disable_cleanup_in_background() \
            .build()
        state_descriptor.enable_time_to_live(state_ttl_config)
        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # retrieve the current count
        current = self.state.value()
        if current is None:
            current = 0

        # update the state's count
        current += value[2]
        self.state.update(current)

        # register an event time timer 2 seconds later
        ctx.timer_service().register_event_time_timer(ctx.timestamp() + 2000)

    def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
        yield ctx.get_current_key(), self.state.value(), timestamp


class MyTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value, record_timestamp: int) -> int:
        return int(value[0])

这是主要功能:

def event_timer_timer_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    env.set_parallelism(1)
    env.get_config().set_auto_watermark_interval(1)

    ds = env.from_collection(
        collection=[
            (1000, 'Alice', 110.1),
            (4000, 'Bob', 30.2),
            (3000, 'Alice', 20.0),
            (2000, 'Bob', 53.1),
            (5000, 'Alice', 13.1),
            (3000, 'Bob', 3.1),
            (7000, 'Bob', 16.1),
            (10000, 'Alice', 20.1)
        ],
        type_info=Types.TUPLE([Types.LONG(), Types.STRING(), Types.FLOAT()]))
    ds = ds.assign_timestamps_and_watermarks(
        WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(2))
                         .with_timestamp_assigner(MyTimestampAssigner()))


    # apply the process function onto a keyed stream
    ds.key_by(lambda value: value[1]) \
      .process(Sum()) \
      .print()

    # submit for execution
    env.execute()

无论我使用哪种水印策略,我的主管道都有同样的问题。水印不应该链接到时间戳并在 ProcessFunction.Context 中看到吗?

4

1 回答 1

1

这个问题主要是因为 Source 是一个 BOUNDED Source。在触发 WatermarkStrategy 之前,整个 Flink Job 的执行就结束了。

您可以参考以下示例生成用户记录,而不是 fromCollection

/** Data-generating source function. */
public static final class Generator
        implements SourceFunction<Tuple2<Integer, Integer>>, CheckpointedFunction {

    private static final long serialVersionUID = -2819385275681175792L;

    private final int numKeys;
    private final int idlenessMs;
    private final int recordsToEmit;

    private volatile int numRecordsEmitted = 0;
    private volatile boolean canceled = false;

    private ListState<Integer> state = null;

    Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
        this.numKeys = numKeys;
        this.idlenessMs = idlenessMs;

        this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
    }

    @Override
    public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
        while (numRecordsEmitted < recordsToEmit) {
            synchronized (ctx.getCheckpointLock()) {
                for (int i = 0; i < numKeys; i++) {
                    ctx.collect(Tuple2.of(i, numRecordsEmitted));
                    numRecordsEmitted++;
                }
            }
            Thread.sleep(idlenessMs);
        }

        while (!canceled) {
            Thread.sleep(50);
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}

}

于 2021-10-21T03:53:52.267 回答