语境
我有一个由 python SQL api 编码的 Flink 作业。它使用来自 Kinesis 的源数据并将结果生成给 Kinesis。我想进行本地测试以确保 Flink 应用程序代码正确。所以我用文件系统连接器模拟了源 Kinesis 和接收器 Kinesis。然后在本地运行测试管道。虽然本地的 flink 作业总是运行成功。但是当我查看接收器文件时。接收器文件始终为空。当我在“Flink SQL Client”中运行代码时也是如此。
这是我的代码:
CREATE TABLE incoming_data (
requestId VARCHAR(4),
groupId VARCHAR(32),
userId VARCHAR(32),
requestStartTime VARCHAR(32),
processTime AS PROCTIME(),
requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'),
WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/test/json/file.json',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
CREATE TABLE user_latest_request (
groupId VARCHAR(32),
userId VARCHAR(32),
latestRequestTime TIMESTAMP
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/sink',
'format' = 'csv'
)
INSERT INTO user_latest_request
SELECT groupId,
userId,
MAX(requestTime) as latestRequestTime
FROM incoming_data
GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId;
好奇我在这里做错了什么。
笔记:
- 我正在使用 Flink 1.11.0
- 如果我直接将数据从源转储到接收器而不进行窗口和分组,它工作正常。这意味着源和汇表设置正确。所以似乎问题在于本地文件系统的翻滚和分组。
- 此代码适用于 Kinesis 源和接收器。