问题标签 [flink-table-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
353 浏览

apache-flink - 从 Postgres 表创建 Flink 数据流

我正在尝试处理大量数据流(源 = Kinesis 流)并沉入 Postgres DB。这样做时,我需要首先将传入流与一些已经存在于 Postgres DB 中的主数据连接起来。

我正在从传入的 kinesis 流创建一个键控流,并使用 JDBC 目录使用 Flink 表 API 创建第二个流。我已按如下方式设置了我的数据库接收器:

这样每次 Flinks 进行检查点接收器时都会被触发。

但是,当我与来自 JDBC 源的传入流进行连接时,我得到以下信息:

这阻塞了我的接收器,因为检查点每次都会中止。

看来我的 JDBC 源代码很早就完成了,当 Flink 尝试检查点时,它没有找到任何正在运行的作业并中止检查点。Flink 似乎有一个限制,它仅在所有操作员/任务仍在运行时才检查点

https://issues.apache.org/jira/browse/FLINK-2491

我正在按如下方式设置我的 JDBC 流:

这是正确的理解吗?这里有办法吗?

0 投票
1 回答
195 浏览

pandas - 将 Flink 动态表转换为 Pandas 数据框

我正在使用 pyflink table api 从 Kafka 读取数据。现在我想将结果表转换为 Pandas 数据框。这是我的代码,

但在这里我既没有得到错误也没有得到结果。我正在使用 Flink 1.11.3。有没有办法将此动态表转换为静态表或使 table.to_pandas() 工作的东西?

0 投票
0 回答
650 浏览

apache-flink - Apache Flink 1.11 流式接收器到 S3

我正在使用 Flink FileSystem SQL 连接器从 Kafka 读取事件并写入 S3(使用 MinIo)。这是我的代码,

我正在使用 Flink 1.11.3 和 flink-s3-fs-hadoop 1.11.3。我已将 flink-s3-fs-hadoop-1.11.3.jar 复制到 plugins 文件夹中。

我还在 flink-conf.yaml 中添加了以下配置。

MinIo 运行正常,我在 MinIo 中创建了“测试桶”。当我运行这个作业时,作业提交不会发生,Flink Dashboard 进入某种等待状态。15-20 分钟后,我得到以下异常,

这里似乎有什么问题?

0 投票
1 回答
332 浏览

apache-kafka - 我们可以使用 Flink kafka Upsert 连接器连接到/从 Kafka 压缩主题吗?

感觉很明显,但我还是在问,因为我在文档中找不到明确的确认:

Flink 1.12 中可用的Flink Table API upsert kafka 连接器的语义与 Kafka 压缩主题的语义非常匹配:将流解释为变更日志并将NULL值用作墓碑来标记删除。

所以我的假设是可以使用它来消费和生产压缩主题,并且它可能正是为此而制作的,尽管假设它的内容确实是一个变更日志,它应该也适用于非压缩主题。但是我很惊讶在文档的那部分中没有找到对压缩主题的任何引用。

有人可以确认或证实这个假设吗?

0 投票
2 回答
1274 浏览

apache-flink - Flink表异常:窗口聚合只能定义在一个时间属性列上,但遇到TIMESTAMP(6)

我正在使用 flink 1.12.0。尝试将数据流转换为表 A 并在表 A 上运行 sql 查询以在窗口上聚合,如下所示。我使用 f2 列作为其时间戳数据类型字段。

当我执行上面的代码时,我得到

线程“主”org.apache.flink.table.api.TableException 中的异常:窗口聚合只能在时间属性列上定义,但遇到 TIMESTAMP(6)。在 org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:50) 在 org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase. scala:81) 在 org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) 在 org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) 在 org.apache.calcite .plan.hep.HepPlanner.applyRules(HepPlanner.java:407) 在 org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) 在 org.apache.calcite.plan.hep.HepInstruction$RuleInstance .

0 投票
0 回答
348 浏览

apache-flink - Flink Table API 中如何为表中的每一行分配一个唯一的 ID?

我正在使用 Flink 来计算一系列操作。每个操作都会生成一个表,该表既用于下一个操作,也存储在 S3 中。这使得可以查看计算中每个中间步骤的数据并查看每个操作的效果。

我需要为每个表中的每一行分配一个唯一标识符,以便当该标识符在以下步骤中再次出现(可能在不同的列中)时,我知道两行相互关联。

第一个明显的候选似乎是ROW_NUMBER()函数,但是:

  1. 它似乎不在表表达式 API 的任何地方。我必须构造 SQL 字符串吗?

  2. 我该如何使用它?当我尝试这个查询时:

    SELECT *, ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp

    我收到此错误:

    org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.

  3. 它总是需要对表格进行排序吗?这似乎是我宁愿避免的开销。

下一个选项只是为每一行生成一个随机 UUID。但是当我尝试这个时,相同的 UUID 永远不会被使用两次,所以它完全没用。这是一个例子:

我得到的输出:

我需要 UUIDout1重新出现out2在两列中,例如:

我想这是由于文档中的这个注释:

此函数不是确定性的,这意味着将为每条记录重新计算该值。

如何只计算一次 UUID 并使其“具体”,以便将相同的值发送到out1out2

我使用用户定义的函数得到了类似的结果:

0 投票
1 回答
176 浏览

apache-kafka - Flink 表和 Hive 目录存储

我有一个 kafka 主题和一个 Hive Metastore。我想将来自 kafka 主题的传入事件与元存储的记录一起加入。我看到了 Flink 使用目录来查询 Hive Metastore 的可能性。所以我看到了两种处理方法:

  • 使用 DataStream api 使用 kafka 主题并在 processFunction 或类似的东西中以一种或另一种方式查询 Hive 目录
  • 使用 Table-Api,我将从 kafka 主题创建一个表并将其与 Hive 目录连接

我最大的担忧是与存储相关的。在这两种情况下,什么存储在内存中,什么不存储?Hive 目录是否在 Flink 的集群端存储任何内容?在第二种情况下,表是如何处理的?flink 会创建副本吗?

哪种解决方案似乎最好?(也许两者都不是好选择)

0 投票
1 回答
140 浏览

apache-flink - Flink Table-API 和 DataStream ProcessFunction

我想加入一个大表,不可能包含在TM内存和流(kakfa)中。我成功地加入了我的测试,将 table-api 与 datastream api 混合在一起。我做了以下事情:

它正在工作,但我从未在任何地方看到过这种类型的实现。可以吗?有什么缺点?

0 投票
0 回答
47 浏览

apache-flink - 在 Google Cloud Storage 中使用 Flink Table API(Flink version-1.12.0)存储 parquet 文件时出现内存不足错误堆

希望你一切顺利。我们目前正在使用 Flink Table API (Flink Version-1.12.0) 从 Kafka 流式传输数据并将其存储在 Google Cloud Storage 中。我们用来存储数据的文件格式是 Parquet。最初,Flink 工作运行良好,我们能够流式传输数据并将其成功存储在 Google Cloud Storage 中。但是我们注意到,一旦我们增加了输入数据的基数并且增加了到 Kafka 的数据量,即每秒向 Kafka 流更多的事件,我们注意到 Flink 作业会抛出以下错误:

  1. 超过 GC 超限
  2. Java 堆内存空间不足 - 错误。

我们尝试使用 Kubernetes Cluster 和 flink 在 YARN 上运行 flink。在这两种情况下,随着数据量的增加,我们都看到了上述错误。我们为作业管理器提供了 2 个任务管理器,每个 10 GB 和 1 GB。我们的 flink 作业的检查点间隔是 3 分钟。我知道 Flink- https: //issues.apache.org/jira/browse/FLINK-20945 中存在一个错误。请让我知道,如果有办法解决这个问题。

0 投票
0 回答
27 浏览

apache-flink - Flink TableAPI 中的后续窗口分组导致 RuntimeException

我正在开发一个使用 Apache Flink 的 Table API (1.12.0) 对图形流进行分组/汇总的项目。在我们的算法中,我们首先处理顶点,即将它们分组并聚合一些属性。这是我的应用程序的片段:

第一个窗口分组是删除重复项,因为多个边可以具有相同的源/目标顶点(即,相同的 id、标签等)和不同的时间戳(时间戳是从边获取的)。第二个分组用于按给定配置对不同的顶点进行分组并聚合其值。

问题:当我groupedVertices在后续步骤中使用时,比如只是投影(请参阅我的剪辑的最后几行),我得到以下异常(请参阅评论中的 Jira 问题)。似乎别名w1_rowtime不再“注册”。

有谁知道这种情况的解决方案或解决方法?