问题标签 [flink-table-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - 从 Postgres 表创建 Flink 数据流
我正在尝试处理大量数据流(源 = Kinesis 流)并沉入 Postgres DB。这样做时,我需要首先将传入流与一些已经存在于 Postgres DB 中的主数据连接起来。
我正在从传入的 kinesis 流创建一个键控流,并使用 JDBC 目录使用 Flink 表 API 创建第二个流。我已按如下方式设置了我的数据库接收器:
这样每次 Flinks 进行检查点接收器时都会被触发。
但是,当我与来自 JDBC 源的传入流进行连接时,我得到以下信息:
这阻塞了我的接收器,因为检查点每次都会中止。
看来我的 JDBC 源代码很早就完成了,当 Flink 尝试检查点时,它没有找到任何正在运行的作业并中止检查点。Flink 似乎有一个限制,它仅在所有操作员/任务仍在运行时才检查点
https://issues.apache.org/jira/browse/FLINK-2491
我正在按如下方式设置我的 JDBC 流:
这是正确的理解吗?这里有办法吗?
pandas - 将 Flink 动态表转换为 Pandas 数据框
我正在使用 pyflink table api 从 Kafka 读取数据。现在我想将结果表转换为 Pandas 数据框。这是我的代码,
但在这里我既没有得到错误也没有得到结果。我正在使用 Flink 1.11.3。有没有办法将此动态表转换为静态表或使 table.to_pandas() 工作的东西?
apache-flink - Apache Flink 1.11 流式接收器到 S3
我正在使用 Flink FileSystem SQL 连接器从 Kafka 读取事件并写入 S3(使用 MinIo)。这是我的代码,
我正在使用 Flink 1.11.3 和 flink-s3-fs-hadoop 1.11.3。我已将 flink-s3-fs-hadoop-1.11.3.jar 复制到 plugins 文件夹中。
我还在 flink-conf.yaml 中添加了以下配置。
MinIo 运行正常,我在 MinIo 中创建了“测试桶”。当我运行这个作业时,作业提交不会发生,Flink Dashboard 进入某种等待状态。15-20 分钟后,我得到以下异常,
这里似乎有什么问题?
apache-kafka - 我们可以使用 Flink kafka Upsert 连接器连接到/从 Kafka 压缩主题吗?
这感觉很明显,但我还是在问,因为我在文档中找不到明确的确认:
Flink 1.12 中可用的Flink Table API upsert kafka 连接器的语义与 Kafka 压缩主题的语义非常匹配:将流解释为变更日志并将NULL
值用作墓碑来标记删除。
所以我的假设是可以使用它来消费和生产压缩主题,并且它可能正是为此而制作的,尽管假设它的内容确实是一个变更日志,它应该也适用于非压缩主题。但是我很惊讶在文档的那部分中没有找到对压缩主题的任何引用。
有人可以确认或证实这个假设吗?
apache-flink - Flink表异常:窗口聚合只能定义在一个时间属性列上,但遇到TIMESTAMP(6)
我正在使用 flink 1.12.0。尝试将数据流转换为表 A 并在表 A 上运行 sql 查询以在窗口上聚合,如下所示。我使用 f2 列作为其时间戳数据类型字段。
当我执行上面的代码时,我得到
线程“主”org.apache.flink.table.api.TableException 中的异常:窗口聚合只能在时间属性列上定义,但遇到 TIMESTAMP(6)。在 org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:50) 在 org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase. scala:81) 在 org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) 在 org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) 在 org.apache.calcite .plan.hep.HepPlanner.applyRules(HepPlanner.java:407) 在 org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) 在 org.apache.calcite.plan.hep.HepInstruction$RuleInstance .
apache-flink - Flink Table API 中如何为表中的每一行分配一个唯一的 ID?
我正在使用 Flink 来计算一系列操作。每个操作都会生成一个表,该表既用于下一个操作,也存储在 S3 中。这使得可以查看计算中每个中间步骤的数据并查看每个操作的效果。
我需要为每个表中的每一行分配一个唯一标识符,以便当该标识符在以下步骤中再次出现(可能在不同的列中)时,我知道两行相互关联。
第一个明显的候选似乎是ROW_NUMBER()
函数,但是:
它似乎不在表表达式 API 的任何地方。我必须构造 SQL 字符串吗?
我该如何使用它?当我尝试这个查询时:
SELECT *, ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp
我收到此错误:
org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
它总是需要对表格进行排序吗?这似乎是我宁愿避免的开销。
下一个选项只是为每一行生成一个随机 UUID。但是当我尝试这个时,相同的 UUID 永远不会被使用两次,所以它完全没用。这是一个例子:
我得到的输出:
我需要 UUIDout1
重新出现out2
在两列中,例如:
我想这是由于文档中的这个注释:
此函数不是确定性的,这意味着将为每条记录重新计算该值。
如何只计算一次 UUID 并使其“具体”,以便将相同的值发送到out1
和out2
?
我使用用户定义的函数得到了类似的结果:
apache-kafka - Flink 表和 Hive 目录存储
我有一个 kafka 主题和一个 Hive Metastore。我想将来自 kafka 主题的传入事件与元存储的记录一起加入。我看到了 Flink 使用目录来查询 Hive Metastore 的可能性。所以我看到了两种处理方法:
- 使用 DataStream api 使用 kafka 主题并在 processFunction 或类似的东西中以一种或另一种方式查询 Hive 目录
- 使用 Table-Api,我将从 kafka 主题创建一个表并将其与 Hive 目录连接
我最大的担忧是与存储相关的。在这两种情况下,什么存储在内存中,什么不存储?Hive 目录是否在 Flink 的集群端存储任何内容?在第二种情况下,表是如何处理的?flink 会创建副本吗?
哪种解决方案似乎最好?(也许两者都不是好选择)
apache-flink - Flink Table-API 和 DataStream ProcessFunction
我想加入一个大表,不可能包含在TM内存和流(kakfa)中。我成功地加入了我的测试,将 table-api 与 datastream api 混合在一起。我做了以下事情:
它正在工作,但我从未在任何地方看到过这种类型的实现。可以吗?有什么缺点?
apache-flink - 在 Google Cloud Storage 中使用 Flink Table API(Flink version-1.12.0)存储 parquet 文件时出现内存不足错误堆
希望你一切顺利。我们目前正在使用 Flink Table API (Flink Version-1.12.0) 从 Kafka 流式传输数据并将其存储在 Google Cloud Storage 中。我们用来存储数据的文件格式是 Parquet。最初,Flink 工作运行良好,我们能够流式传输数据并将其成功存储在 Google Cloud Storage 中。但是我们注意到,一旦我们增加了输入数据的基数并且增加了到 Kafka 的数据量,即每秒向 Kafka 流更多的事件,我们注意到 Flink 作业会抛出以下错误:
- 超过 GC 超限
- Java 堆内存空间不足 - 错误。
我们尝试使用 Kubernetes Cluster 和 flink 在 YARN 上运行 flink。在这两种情况下,随着数据量的增加,我们都看到了上述错误。我们为作业管理器提供了 2 个任务管理器,每个 10 GB 和 1 GB。我们的 flink 作业的检查点间隔是 3 分钟。我知道 Flink- https: //issues.apache.org/jira/browse/FLINK-20945 中存在一个错误。请让我知道,如果有办法解决这个问题。
apache-flink - Flink TableAPI 中的后续窗口分组导致 RuntimeException
我正在开发一个使用 Apache Flink 的 Table API (1.12.0) 对图形流进行分组/汇总的项目。在我们的算法中,我们首先处理顶点,即将它们分组并聚合一些属性。这是我的应用程序的片段:
第一个窗口分组是删除重复项,因为多个边可以具有相同的源/目标顶点(即,相同的 id、标签等)和不同的时间戳(时间戳是从边获取的)。第二个分组用于按给定配置对不同的顶点进行分组并聚合其值。
问题:当我groupedVertices
在后续步骤中使用时,比如只是投影(请参阅我的剪辑的最后几行),我得到以下异常(请参阅评论中的 Jira 问题)。似乎别名w1_rowtime
不再“注册”。
有谁知道这种情况的解决方案或解决方法?