0

我正在为使用 match_recognize 的 Flink SQL 语句编写单元测试。我正在设置这样的测试数据

Table data = tEnv.fromValues(DataTypes.ROW(
  DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
  DataTypes.FIELD("foobar", DataTypes.STRING()),
  ....
  ),
  row(...),
  row(...)
);

我有两个问题,

  • 如何将 event_time 指定为水印字段?(表示行时间)
  • 不太重要,给创建的表起一个有意义的名字?

FLINK 版本:1.11

4

1 回答 1

4

您遇到了 Table API 的当前限制:无法结合forValues方法定义水印和行时间属性;你需要一个连接器。有几个选项可以解决它:

1.使用csv您堆叠的连接器VALUES,如本例所示。

2.使用内置的DataGen 连接器。由于您正在为 CEP 进行单元测试,我想您希望对生成的数据进行一定程度的控制,因此这可能不是一个可行的选择。无论如何,我想我会提到它。

注意:使用 SQL DDL 语法是从 Flink 1.10 创建表的推荐方式。这将使您尝试做的两件事(定义水印和命名您的表格)更加简单:

tEnv.executeSql("CREATE TABLE table_name (\n" +
                "             event_time TIMESTAMP(3),\n" +
                "             foobar STRING \n" +
                "             WATERMARK FOR event_time AS event_time\n" +
                ") WITH (...)"
);

Table data = tEnv.from("table_name");

水印被声明为计算列,您可以选择使用多种水印策略。请查看此文档页面以获取更多详细信息。

于 2020-09-28T08:34:43.867 回答