5

我在 Flink 文档和 Flink 官方博客中看到过多次提到基于唯一键的动态表的“upsert 模式” 。但是,我没有看到任何关于如何在动态表上启用此模式的示例/文档。

例子:

  • 博文

    通过更新模式在流上定义动态表时,我们可以在表上指定唯一的键属性。在这种情况下,对键属性执行更新和删除操作。更新模式如下图所示。

  • 文档

    转换为upsert 流的动态表需要一个(可能是复合的)唯一键

所以我的问题是:

  • 如何在 Flink 中的动态表上指定唯一键属性?
  • 如何将动态表置于更新/更新/“替换”模式,而不是附加模式?
4

3 回答 3

8

链接的资源描述了两种不同的场景。

  • 博客文章讨论了upsertDataStream -> Table转换。
  • 文档描述了反向 upsertTable -> DataStream转换。

以下讨论基于 Flink 1.4.0(2018 年 1 月)。

插入DataStream -> Table转换

通过对键的 upsert将 a 转换DataStream为 aTable不是本机支持的,而是在路线图上。同时,您可以使用附加Table和具有用户定义聚合函数的查询来模拟此行为。

如果您有一个跟踪用户登录Table Logins的架构的追加,您可以将其转换为使用以下查询键入的 upsert:(user, loginTime, ip)Tableuser

SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user

LAST_VAL聚合函数是用户自定义的聚合函数,总是返回最新的附加值。

尽管提供了更简洁的 API,但对 upsert 转换的本机支持DataStream -> Table基本上以相同的方式工作。

插入Table -> DataStream转换

不支持将 aTable转换为 upsert 。DataStream这也正确反映在文档中:

请注意,将动态表转换为 DataStream 时仅支持追加和收回流。

我们故意选择不支持 upsertTable -> DataStream转换,因为DataStream只有知道关键属性时才能处理 upsert。这些取决于查询,并不总是很容易识别。开发人员有责任确保正确解释关键属性。不这样做会导致错误的程序。为避免出现问题,我们决定不提供 upsertTable -> DataStream转换。

相反,用户可以将 aTable转换为 retraction DataStream。此外,我们支持UpsertTableSink将 upsert 写入DataStream外部系统,例如数据库或键值存储。

于 2018-02-01T09:12:19.527 回答
2

更新:从 Flink 1.9 开始,如果我们使用 Blink 规划器(这是自 Flink 1.11 以来的默认设置),它是内置聚合函数LAST_VALUE的一部分。

假设Logins上面 Fabian Hueske 的响应中提到的表存在,我们现在可以将其转换为 upsert 表,如下所示:

SELECT 
  user, 
  LAST_VALUE(loginTime), 
  LAST_VALUE(ip) 
FROM Logins 
GROUP BY user
于 2021-01-24T20:01:53.690 回答
0

Flink 1.8 仍然缺乏这样的支持。期望将来添加这些功能:1)LAST_VAL 2)Upsert Stream <-> 动态表。

附言。LAST_VAL() 似乎不可能在 UDTF 中实现。聚合函数不提供附加的事件/过程时间上下文。阿里巴巴的 Blink 提供了 LAST_VAL 的替代实现,但它需要另一个字段来提供订单信息,而不是直接在 event/proc time 上。这使得 sql 代码很难看。( https://help.aliyun.com/knowledge_detail/62791.html )

我的 LAST_VAL (eg.get latest ip) 解决方案类似于:

  1. concat(ts, ip) 作为ordered_ip
  2. MAX(ordered_ip) 作为ordered_ip
  3. 提取(ordered_ip)作为ip
于 2019-11-26T02:47:39.043 回答