在使用 Flink 使用 Kinesis 流中的记录时,我无法理解如何保留事件的顺序。我们的设置如下所示:
- 带有 8 个分片的 Kinesis 流
- 分片键是产生事件的用户的 userId
在 Flink 中,我们使用 Table API 来使用 Kinesis 流,进行一些处理并将事件写入(自定义)同步 HTTP 接收器。期望的结果是每个分片处理子任务一个接一个地将事件写入接收器,等待接收器返回,然后再写入下一个事件。为了测试这一点,我们让 sink 函数Thread.sleep()
在返回前随机执行几秒钟。查看日志输出,我们现在可以看到:
13:00:06.120 c.s.d.a.p.p.f.sinks.HttpSinkFunction - BLOCKING 802719369 {"userId":"6383449","eventTime":"2022-02-15T11:59:37.792Z","shardId":"shardId-000000000005"}
13:00:06.476 c.s.d.a.p.p.f.sinks.HttpSinkFunction - 1973378384 {"userId":"6383449","eventTime":"2022-02-15T11:59:37.792Z","shardId":"shardId-000000000005"}
第一行来自一个阻塞接收器,第二行来自非阻塞接收器。两个事件都来自同一个用户(= 同一个分片,请参阅 JSON 对象中的 shardId),并且彼此处理了几毫秒,即使第一个接收器在写入日志行后会休眠 10 秒。这也意味着结果将无序到达 HTTP 端点。
我研究了有关并行性和背压的 Flink 文档,但我仍然不确定如何实现所需的行为。是否可以一次将输出写入每个分片的一个接收器函数,以便在接收器响应缓慢时延迟分片的完整处理?
更新:有关设置的更多信息
首先,我们定义一个输入表(使用 Kinesis 连接器)和一个输出表(使用我们的自定义 http 连接器)。然后我们创建一个语句集,向其中添加几个插入 SQL 并执行该语句集。代码看起来很像这样(extractionSql
作为查询字符串列表,见下文):
StatementSet statementSet = tableEnv.createStatementSet();
for (String extractionSql : extractionSqls) {
statementSet.addInsertSql(extractionSql);
}
statementSet.execute();
插入 SQL 看起来都非常相似,基本上只是从输入事件中提取属性,还涉及一个窗口函数(翻转窗口)。示例 SQL 如下所示:
INSERT INTO output_table
SELECT userId, 'replace', eventTime, MAP['heroLevel',payload['newLevel']], shardId
FROM input_table
WHERE `eventType` = 'LEVELUP'
这个想法是,每当一个“LEVELUP”类型的事件到达时,我们都想向我们的 API 发送一个 http 请求。由于稍后的处理方式,我们需要确保单个用户的事件按顺序同步发送。
在 Flink 仪表板中,生成的图表如下所示: