问题标签 [flink-table-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
106 浏览

apache-flink - 在 Flink Table API 中应用翻转窗口聚合时,kafka 主题接收器没有结果

我正在使用lyft flink 运营商部署的 Flink 1.14

我正在尝试使用 Table API 进行翻转窗口聚合,从事务表源中读取,并将窗口聚合结果放入新的 kafka 主题

我的来源是 debezium 的一个 kafka 主题

我做翻滚窗口并通过以下方式计算同一窗口中的ID:

水槽是:

问题是当我使用 dbserver1.inventory.my-window-sink 时 kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.my-window-sink --from-beginning我没有得到任何结果,我等待 2 分钟(窗口大小),插入事务表,然后再次等待 2 分钟并再次插入也没有结果。我不知道我的水印是否有问题

我正在使用并行性:2

在 flink 仪表板 UI 上,我可以看到在 GroupWindowAggregate 任务的详细信息中,当我插入表时,收到的记录增加了,但是当我使用主题时,我仍然看不到结果!

0 投票
0 回答
50 浏览

mysql - 在 flink table api 中应用 flink tumble 窗口时没有结果

我在学习https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/table_api/时遇到了同样的问题。当我使用翻滚窗口时,我在 MySQL 接收器中什么也没有。

当我使用默认的 8 并行度时,我发现一些子任务在 web 前端没有收到来自上游的记录。 在此处输入图像描述

但是当我添加tEnv.getConfig().addConfiguration(new Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 1));main 函数时,我在 MySQL sink 中得到了结果,Web 前端如下所示: 在此处输入图像描述

谁能告诉我为什么?

0 投票
1 回答
123 浏览

apache-flink - Apache Flink 1.14.0 - 无法通过 Java 中的 SQL DDL 使用 python UDF

我正在尝试在 SQL DDL(1.14.0) 中执行 python UDF 函数

Python文件在这里:</p>

并启动 flink 集群:

Java代码在这里:

然后通过 Flink 客户端运行作业:

错误是:

但是当我使用 sql-client 执行我的 py UDF 时,它运行成功。

启动 sql 客户端:

然后

create temporary system function add1 as 'udfTest.add_one' language python;

然后

select add1(3);

我得到了正确的结果4,我的代码有问题吗?

我看到版本支持 py UDF 函数1.11 https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL,但现在我使用的是 1.14.0。

谁能帮帮我!

0 投票
1 回答
51 浏览

java - Flink 无法从通过 PipelineOptions 添加的 JAR 加载类

我正在开发一个在 Flink 1.14 上使用 UDF 的 Java 应用程序。我正在使用PipelineOptions.JARS配置在应用程序代码中动态添加包含 UDF 类的 jar 文件,但是应用程序无法从配置的 jar 文件中加载 UDF 类ClassNotFoundException

我也试过PipelineOptions.CLASSPATHS了,它失败了,错误和堆栈跟踪完全相同。

如果通过 Flink CLI 使用“flink run”和“-C”选项更新类路径,相同的应用程序 jar 可以正常工作:

<FLINK_HOME>/bin/flink run --detached -C file:///path/to/udf.jar ...

问题似乎是 table planner 中的 codegen 使用的 ClassLoader 的类路径没有根据传递给的 Configuration 进行更新StreamExecutionEnvironment,我不确定如何做到这一点。

以下是添加 UDF jar 文件和注册 UDF 的方式:

这是错误堆栈跟踪:

0 投票
1 回答
73 浏览

apache-flink - 如何将 Table 转换为包含数组类型的 DataStream(Flink)?

我有关于 Flink (1.13+) 的 table-api 的问题。我有一个包含多个字段的 POJO,其中之一是:

我为此字段使用以下声明创建表:

之后,当我尝试使用 toRetractStream[MY_POJO] 方法将表转换为 DataStream 时,出现以下错误:

线程“main” org.apache.flink.table.api.ValidationException 中的异常:未注册表的查询结果和接收器的列类型不匹配。

原因:位置 11 的接收器列“my_list”的类型不兼容。查询架构:[...,my_list:ARRAY,...] 接收器架构:[...,my_list:RAW('java.util.List', ?), ...]

我想避免手动映射每个字段并保持代码干净,是否有处理这种数据类型的解决方案?

0 投票
1 回答
53 浏览

apache-flink - Flink mysql 和 mysql-cdc 连接器的区别?

为了丰富数据流,我们计划将 MySQL(MemSQL)服务器连接到我们现有的 flink 流应用程序

我们可以看到 Flink 提供了一个带有 JDBC 连接器的 Table APIhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/

此外,我发现了另一个名为 Flink-CDC 的 MySQL 连接器,https://ververica.github.io/flink-cdc-connectors/master/content/about.html它允许以流方式使用外部数据库

它们之间有什么区别?在我的情况下选择什么更好?

0 投票
0 回答
28 浏览

apache-flink - Flink 丰富来自外部数据库的数据流

我们需要使用存储在外部数据库中的附加信息来丰富数据流。
相关 Q. with arch diagram -如何组织一个复杂的 Apache Flink 应用程序?

此附加信息可能会发生变化,我们的应用程序必须对此做出响应。但这些变化很少见。

我发现要连接外部数据库,您需要使用 Table API。至少不支持将 DataStream JDBC 适配器作为数据源

组织此类应用程序的最佳方式是什么?以及哪个连接器,因为它们有很多。

如果使用 CDC 连接器?应用程序启动后如何延迟加载数据而不是从保存点?

0 投票
2 回答
43 浏览

java - 如何从 Mysql 数据库创建 DataStreamSource?

我在运行 flink 作业时遇到问题,该作业基本上是对 mysql 数据库运行查询,然后尝试创建一个必须从不同作业访问的临时视图。

我对 Flink 很陌生,我目前正在研究为所有这些提供的 API,但我实际上无法理解我做错了什么。在我看来,通过在作业结束时打印结果来测试这个过程似乎很简单,但我得到的唯一结果是这样的:

该作业的重点是创建一个临时表视图,用于通过查询该表视图来缓存将在其他 Flink 作业中使用的一些静态数据。

0 投票
1 回答
28 浏览

apache-kafka - 为什么 Flink Table SQL API upsert-kafka sink 连接器不创建日志压缩主题?

我正在尝试复制 Flink 的upsert-kafka 连接器示例

使用以下输入:

并创建了一个主题,其事件结构如下所示:

使用以下 kafka 上游,kafka-upsert sink 逻辑:

我希望只为 {"user_region":"TR"} 获得一个具有更新 pv: 2 的键,但是创建的主题似乎没有被日志压缩,因此观察到同一个 user_region 的两个事件:

upsert-kafka连接器不应该为此用例创建日志压缩主题,还是开发人员有责任更新主题配置?

另一种选择可能是我误解了某些东西或犯了错误。期待听到你的想法。谢谢。

0 投票
1 回答
26 浏览

apache-flink - 如何在 Flink Table API 中加入两个连续查询?

我想将两个连续查询(基于单个上游连接器的视图)堆叠在一起,并最终在管道末端的接收器结果中保持一致。

  • 第一个视图将按源删除重复的事件。
  • 第二个视图会将来自不同来源的相似事件合并为一个,完全外部连接第一个视图本身并使用 COALESCE。

一次只下沉这些转换中的一个似乎工作正常。最终产生一致的结果。

将它们组合在一起似乎不会处理第一个查询应该删除的迟到的重复事件(如果我们等待的时间足够长)并作为更新状态推送到第二个查询。

例如,如果我使用 toRetractStream 打印去重事件视图,我会得到 e1、e2、e3 作为结果。e4 被淘汰,因为它是 e3 的重复事件。

如果我打印第二个视图,我会得到 e1、e2、e3 和 e4 作为结果,因为 e3 和 e4 都有资格合并。但是,最终,当在第一个视图中发现 e4 是重复事件时,我希望从该结果中删除它。

是否可以使用 Flink Table API 做到这一点?

编辑:在下面分享一个示例用例。

在此管道的末尾,预计只有18e5dc326事件合并字段 true,因为从第一步开始,id为 21e5dc326的事件应该已被删除。

但是,第二步仍然以某种方式将其与另一个事件合并,因此在其合并字段中具有 true 。