问题标签 [flink-table-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 使用 jdbc 连接器将 flink DataStream 接收到具有覆盖的 mysql 接收器
我的用例是
- 从 AWS Kinesis 数据流中获取数据并使用 flink 数据流 api 过滤/映射
- 使用 StreamTable 环境对数据进行分组和聚合
- 使用 JDBC 连接器使用 SQLTableEnvironment 写入 mysql
我能够将我的数据流结果写入 mySQL 表,但由于流式传输它附加了每个新行,而我想覆盖。
apache-flink - Apache flink 全外连接的错误结果
我有 2 个数据流,它们是从 2 个表创建的,例如:
我想对这两个流执行完全外连接,我使用了这两个流orderRes1.fullOuterJoin(orderRes2 ,$(exp))
和一个包含完全外连接的 sql 查询,如下所示:
但是,在某些情况下,结果并不正确:假设用户 A 向 B 卖出 3 次价格,然后用户 B 向 A 卖出 2 次,第二次结果是:
7> (true,123,a,300.0,0.0)
7> (true,123,a,300.0,200.0)
10> (true,123,b,0.0,300.0)
10> (true,123,b,200.0,300.0)
第二行和第四行是流的预期结果,但它也会生成第一行和第三行。值得一提的是 coGroup 是另一种解决方案,但我不想在这种情况下使用窗口化,并且非窗口化解决方案只能在有界流(DataSet)中访问。
提示:orderId 和 userId 将在两个流中重复,我想在每个操作中生成 2 行,包含:orderId、userId1、bidTotalPrice、askTotalPrice 和 orderId、userId2、bidTotalPrice、askTotalPrice
apache-flink - Flink 嵌套类 toDataStream 转换报错
我正在使用 flink 1.13。我正在尝试通过以下方式将表结果转换为数据流,但不断出错。
我得到的错误是:
我也尝试过从 DataStream 到 table 的自定义转换,但是在从 table 转换到 DataStream 时仍然遇到错误。我被卡住了,所以任何帮助表示赞赏。
java - Flink Table,Create table 数组类型报错“ValidationException”
我创建了一个包含数据类型字段的flink表,报错类型不匹配。想知道如何在flink表中创建一个包含数组类型的临时表。
apache-flink - Apache Flink 1.13 版将表转换为数据集?
我正在将一些为 Flink 1.5 版编写的遗留 Java 代码转换为 Flink 1.13.1 版。具体来说,我正在使用 Table API。我必须从 CSV 文件中读取数据,执行一些基本的 SQL,然后将结果写回文件。
对于 Flink 1.5 版本,我使用以下代码执行上述操作
为了将上述代码转换为 Flink 1.13.1 版本,我编写了以下代码
但是,BatchTableEnvironment
在 Flink 1.13 版本中被标记为“已弃用”。是否有任何替代方法可以转换Table
为Dataset
或直接将 a 写入Table
文件?
java - 使用表 API 时如何将 java LocalDateTime 映射到 Flink TIMESTAMP
我的代码是这样的:
然后我在尝试调用 date_format 之后意识到字段 dt 不是 TIMESTAMP 。
然后我更新了代码:
新代码工作正常,但对我来说似乎有点扭曲,因为我必须添加一个基本什么都不做的地图。
所以我的问题是,将 java LocalDateTime 映射到 flink TIMESTAMP 的正确方法是什么?
我正在使用 Flink 1.13.0。
apache-flink - Flink Sql 未将 RAW('org.apache.avro.util.Utf8', '...') 转换为字符串
我正在从 kafka 流中读取数据,创建 Table 环境并计算平均值并将数据写回 kafka [SIMPLECUSTOMER]。
这在 Flink 1.12.5 中有效。我正在使用 Flink 1.13.2 和 Flink 1.14.0
customerId 被读取为 Avro Generated Java 类中定义的 RAW('org.apache.avro.util.Utf8', '...')。在写回接收器时,我收到以下错误。
org.apache.flink.table.api.ValidationException:注册表'default_catalog.default_database.SIMPLECUSTOMER'的查询结果和接收器的列类型不匹配。
原因:位置 0 的接收器列“customerId”的类型不兼容。
查询架构:[customerId: RAW('org.apache.avro.util.Utf8', '...'), age: INT NOT NULL] 接收器架构:[customerId: STRING, age: INT]
接收器表架构:
这是我的接收器代码
尝试将其转换为字符串,但这也不起作用
原因:org.apache.calcite.sql.validate.SqlValidatorException:Cast 函数无法将 RAW('org.apache.avro.util.Utf8', '...') 类型的值转换为 java 中的 VARCHAR(2147483647) 类型.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl。 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) 上的 newInstance(DelegatingConstructorAccessorImpl.java:45)
apache-flink - Flink 接收器文件系统作为镶木地板 - 保存嵌套数据时出错
我正在尝试将 json 数据转换为镶木地板,这样我就可以使用 Trino 或 presto 进行查询。示例 JSON 如下:
我的 Flink 代码如下:
有了这个我得到以下错误
我可以使用地图、数组或行打印数据,但无法将这些数据保存为镶木地板。先感谢您。
apache-kafka - 找不到任何实现“org.apache.flink.table.factories.DeserializationFormatFactory”的标识符“avro-confluent”的工厂
我有一个 Flink 作业在本地运行良好,但当我尝试在集群上运行作业时失败。尝试通过 'connector' = 'kafka' 从 Kafka 加载数据时发生错误。我正在使用 Flink-Table API 和 confluent-avro 格式从 Kafka 读取数据。
所以基本上我创建了一个从 kafka 主题读取数据的表:
然后我创建了另一个表,我将使用它作为输出表:
当我在本地机器上运行它时,一切正常,但是当我在集群中运行它时,它崩溃了。
以下是我的 build.sbt:
类似的问题已发布在Flink 1.12 中找不到任何在类路径中实现 'org.apache.flink.table.factories.DynamicTableFactory' 的标识符 'kafka' 的工厂, 但在我的情况下添加提供的解决方案不起作用。
apache-flink - 进行表连接时如何获得最后一个结果(在flink sql中使用toRetractStream
这是我使用 flink sql API 连接两个表的代码
我的输出是这样的:
如您所见,toRetractStream
API 将生成三条记录。我想知道如何获得最后一条记录,它正确地加起来了A.speed_sum
和B.speed_sum
(A.cnt
和B.cnt
)。