1

在使用 Flink 使用 Kinesis 流中的记录时,我无法理解如何保留事件的顺序。我们的设置如下所示:

  • 带有 8 个分片的 Kinesis 流
  • 分片键是产生事件的用户的 userId

在 Flink 中,我们使用 Table API 来使用 Kinesis 流,进行一些处理并将事件写入(自定义)同步 HTTP 接收器。期望的结果是每个分片处理子任务一个接一个地将事件写入接收器,等待接收器返回,然后再写入下一个事件。为了测试这一点,我们让 sink 函数Thread.sleep()在返回前随机执行几秒钟。查看日志输出,我们现在可以看到:

13:00:06.120 c.s.d.a.p.p.f.sinks.HttpSinkFunction - BLOCKING 802719369 {"userId":"6383449","eventTime":"2022-02-15T11:59:37.792Z","shardId":"shardId-000000000005"}
13:00:06.476 c.s.d.a.p.p.f.sinks.HttpSinkFunction - 1973378384 {"userId":"6383449","eventTime":"2022-02-15T11:59:37.792Z","shardId":"shardId-000000000005"}

第一行来自一个阻塞接收器,第二行来自非阻塞接收器。两个事件都来自同一个用户(= 同一个分片,请参阅 JSON 对象中的 shardId),并且彼此处理了几毫秒,即使第一个接收器在写入日志行后会休眠 10 秒。这也意味着结果将无序到达 HTTP 端点。

我研究了有关并行性和背压的 Flink 文档,但我仍然不确定如何实现所需的行为。是否可以一次将输出写入每个分片的一个接收器函数,以便在接收器响应缓慢时延迟分片的完整处理?

更新:有关设置的更多信息

首先,我们定义一个输入表(使用 Kinesis 连接器)和一个输出表(使用我们的自定义 http 连接器)。然后我们创建一个语句集,向其中添加几个插入 SQL 并执行该语句集。代码看起来很像这样(extractionSql作为查询字符串列表,见下​​文):

StatementSet statementSet = tableEnv.createStatementSet();
for (String extractionSql : extractionSqls) {
    statementSet.addInsertSql(extractionSql);
}
statementSet.execute();

插入 SQL 看起来都非常相似,基本上只是从输入事件中提取属性,还涉及一个窗​​口函数(翻转窗口)。示例 SQL 如下所示:

INSERT INTO output_table
SELECT userId, 'replace', eventTime, MAP['heroLevel',payload['newLevel']], shardId
FROM input_table
WHERE `eventType` = 'LEVELUP'

这个想法是,每当一个“LEVELUP”类型的事件到达时,我们都想向我们的 API 发送一个 http 请求。由于稍后的处理方式,我们需要确保单个用户的事件按顺序同步发送。

在 Flink 仪表板中,生成的图表如下所示:

在此处输入图像描述

4

1 回答 1

2

鉴于您的要求,我能看到的唯一方法是将每个用户的所有结果放在一起,以便它们由接收器的同一实例编写。

也许将其重写为您按时间戳排序的用户 ID 上的一个大型联接(或联合)。或者您可以将 SQL 查询的结果转换为您通过用户 ID 键入的数据流,然后在您的自定义接收器中实现一些缓冲和排序。

于 2022-02-16T21:27:42.803 回答