0

语境

我有一个由 py​​thon SQL api 编码的 Flink 作业。它使用来自 Kinesis 的源数据并将结果生成给 Kinesis。我想进行本地测试以确保 Flink 应用程序代码正确。所以我用文件系统连接器模拟了源 Kinesis 和接收器 Kinesis。然后在本地运行测试管道。虽然本地的 flink 作业总是运行成功。但是当我查看接收器文件时。接收器文件始终为空。当我在“Flink SQL Client”中运行代码时也是如此。

这是我的代码:

CREATE TABLE incoming_data (
        requestId VARCHAR(4),
        groupId VARCHAR(32),
        userId VARCHAR(32),
        requestStartTime VARCHAR(32),
        processTime AS PROCTIME(),
        requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'),
        WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'filesystem',
        'path' = '/path/to/test/json/file.json',
        'format' = 'json',
        'json.timestamp-format.standard' = 'ISO-8601'
    )

CREATE TABLE user_latest_request (
        groupId VARCHAR(32),
        userId VARCHAR(32),
        latestRequestTime TIMESTAMP
    ) WITH (
        'connector' = 'filesystem',
        'path' = '/path/to/sink',
        'format' = 'csv'
)

INSERT INTO user_latest_request
    SELECT groupId,
           userId,
           MAX(requestTime) as latestRequestTime
    FROM incoming_data
    GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId;

好奇我在这里做错了什么。

笔记:

  • 我正在使用 Flink 1.11.0
  • 如果我直接将数据从源转储到接收器而不进行窗口和分组,它工作正常。这意味着源和汇表设置正确。所以似乎问题在于本地文件系统的翻滚和分组。
  • 此代码适用于 Kinesis 源和接收器。
4

2 回答 2

0

您是否启用了检查点?如果您处于 `STREAMING 模式,这是必需的,这似乎是这种情况。见https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/file_sink/

于 2021-11-15T13:33:33.497 回答
0

最可能的原因是正在读取的文件中没有足够的数据来保持作业运行足够长的时间以关闭窗口。您有一个 1 秒长的基于处理时间的窗口,这意味着作业必须运行至少一秒钟才能保证第一个窗口会产生结果。

否则,一旦源用完数据,作业将关闭,无论窗口是否包含未报告的结果。

如果切换到基于事件时间的窗口,那么当文件源用完数据时,它将发送一个值为 MAX_WATERMARK 的最后一个水印,这将触发窗口。

于 2021-11-15T19:58:33.603 回答