问题标签 [flink-table-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
473 浏览

java - 使用 jdbc 连接器将 flink DataStream 接收到具有覆盖的 mysql 接收器

我的用例是

  1. 从 AWS Kinesis 数据流中获取数据并使用 flink 数据流 api 过滤/映射
  2. 使用 StreamTable 环境对数据进行分组和聚合
  3. 使用 JDBC 连接器使用 SQLTableEnvironment 写入 mysql

我能够将我的数据流结果写入 mySQL 表,但由于流式传输它附加了每个新行,而我想覆盖。

0 投票
1 回答
81 浏览

apache-flink - Apache flink 全外连接的错误结果

我有 2 个数据流,它们是从 2 个表创建的,例如:

我想对这两个流执行完全外连接,我使用了这两个流orderRes1.fullOuterJoin(orderRes2 ,$(exp)) 和一个包含完全外连接的 sql 查询,如下所示:

但是,在某些情况下,结果并不正确:假设用户 A 向 B 卖出 3 次价格,然后用户 B 向 A 卖出 2 次,第二次结果是:

7> (true,123,a,300.0,0.0)

7> (true,123,a,300.0,200.0)

10> (true,123,b,0.0,300.0)

10> (true,123,b,200.0,300.0)

第二行和第四行是流的预期结果,但它也会生成第一行和第三行。值得一提的是 coGroup 是另一种解决方案,但我不想在这种情况下使用窗口化,并且非窗口化解决方案只能在有界流(DataSet)中访问。

提示:orderId 和 userId 将在两个流中重复,我想在每个操作中生成 2 行,包含:orderId、userId1、bidTotalPrice、askTotalPrice 和 orderId、userId2、bidTotalPrice、askTotalPrice

0 投票
1 回答
218 浏览

apache-flink - Flink 嵌套类 toDataStream 转换报错

我正在使用 flink 1.13。我正在尝试通过以下方式将表结果转换为数据流,但不断出错。

我得到的错误是:

我也尝试过从 DataStream 到 table 的自定义转换,但是在从 table 转换到 DataStream 时仍然遇到错误。我被卡住了,所以任何帮助表示赞赏。

0 投票
1 回答
260 浏览

java - Flink Table,Create table 数组类型报错“ValidationException”

我创建了一个包含数据类型字段的flink表,报错类型不匹配。想知道如何在flink表中创建一个包含数组类型的临时表。

0 投票
0 回答
195 浏览

apache-flink - Apache Flink 1.13 版将表转换为数据集?

我正在将一些为 Flink 1.5 版编写的遗留 Java 代码转换为 Flink 1.13.1 版。具体来说,我正在使用 Table API。我必须从 CSV 文件中读取数据,执行一些基本的 SQL,然后将结果写回文件。

对于 Flink 1.5 版本,我使用以下代码执行上述操作

为了将上述代码转换为 Flink 1.13.1 版本,我编写了以下代码

但是,BatchTableEnvironment在 Flink 1.13 版本中被标记为“已弃用”。是否有任何替代方法可以转换TableDataset或直接将 a 写入Table文件?

0 投票
1 回答
186 浏览

java - 使用表 API 时如何将 java LocalDateTime 映射到 Flink TIMESTAMP

我的代码是这样的:

然后我在尝试调用 date_format 之后意识到字段 dt 不是 TIMESTAMP 。

然后我更新了代码:

新代码工作正常,但对我来说似乎有点扭曲,因为我必须添加一个基本什么都不做的地图。

所以我的问题是,将 java LocalDateTime 映射到 flink TIMESTAMP 的正确方法是什么?

我正在使用 Flink 1.13.0。

0 投票
0 回答
165 浏览

apache-flink - Flink Sql 未将 RAW('org.apache.avro.util.Utf8', '...') 转换为字符串

我正在从 kafka 流中读取数据,创建 Table 环境并计算平均值并将数据写回 kafka [SIMPLECUSTOMER]。

这在 Flink 1.12.5 中有效。我正在使用 Flink 1.13.2 和 Flink 1.14.0

customerId 被读取为 Avro Generated Java 类中定义的 RAW('org.apache.avro.util.Utf8', '...')。在写回接收器时,我收到以下错误。

org.apache.flink.table.api.ValidationException:注册表'default_catalog.default_database.SIMPLECUSTOMER'的查询结果和接收器的列类型不匹配。

原因:位置 0 的接收器列“customerId”的类型不兼容。

查询架构:[customerId: RAW('org.apache.avro.util.Utf8', '...'), age: INT NOT NULL] 接收器架构:[customerId: STRING, age: INT]

接收器表架构:

这是我的接收器代码

尝试将其转换为字符串,但这也不起作用

原因:org.apache.calcite.sql.validate.SqlValidatorException:Cast 函数无法将 RAW('org.apache.avro.util.Utf8', '...') 类型的值转换为 java 中的 VARCHAR(2147483647) 类型.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl。 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) 上的 newInstance(DelegatingConstructorAccessorImpl.java:45)

0 投票
2 回答
124 浏览

apache-flink - Flink 接收器文件系统作为镶木地板 - 保存嵌套数据时出错

我正在尝试将 json 数据转换为镶木地板,这样我就可以使用 Trino 或 presto 进行查询。示例 JSON 如下:

我的 Flink 代码如下:

有了这个我得到以下错误

我可以使用地图、数组或行打印数据,但无法将这些数据保存为镶木地板。先感谢您。

0 投票
1 回答
187 浏览

apache-kafka - 找不到任何实现“org.apache.flink.table.factories.DeserializationFormatFactory”的标识符“avro-confluent”的工厂

我有一个 Flink 作业在本地运行良好,但当我尝试在集群上运行作业时失败。尝试通过 'connector' = 'kafka' 从 Kafka 加载数据时发生错误。我正在使用 Flink-Table API 和 confluent-avro 格式从 Kafka 读取数据。

所以基本上我创建了一个从 kafka 主题读取数据的表:

然后我创建了另一个表,我将使用它作为输出表:

当我在本地机器上运行它时,一切正常,但是当我在集群中运行它时,它崩溃了。

以下是我的 build.sbt:

类似的问题已发布在Flink 1.12 中找不到任何在类路径中实现 'org.apache.flink.table.factories.DynamicTableFactory' 的标识符 'kafka' 的工厂, 但在我的情况下添加提供的解决方案不起作用。

0 投票
1 回答
83 浏览

apache-flink - 进行表连接时如何获得最后一个结果(在flink sql中使用toRetractStream

这是我使用 flink sql API 连接两个表的代码

我的输出是这样的:

如您所见,toRetractStreamAPI 将生成三条记录。我想知道如何获得最后一条记录,它正确地加起来了A.speed_sumB.speed_sumA.cntB.cnt)。