问题标签 [pyflink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1613 浏览

apache-flink - 使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段

我正在尝试使用 Apache Flink 1.11 创建一个源表,我可以在其中访问 JSON 消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。

文档建议它应该是一种类型,MAP但是当我设置它时,我收到以下错误

这是我的 SQL

我的 JSON 看起来像这样:

0 投票
2 回答
430 浏览

mysql - Required context properties mismatch in connecting the flink with mysql database

I am using flink latest (1.11.2) to work with a sample mysql database, which the database is working fine.

Additionally, i have added the flink-connector-jdbc_2.11-1.11.2, mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib

Here is my code

However when running the code, it come up with error saying

latest:

this is my docker yml file.

docker ps commands show out

more info:

my current flink environment in docker is flink:scala_2.12-java8

pyflink jdbc connector is flink-connector-jdbc_2.11-1.11.2.jar from flink 1.11 version.

in order to use the jdbc library, I tried two ways

  1. save the flink-connector-jdbc_2.11-1.11.2.jar into /usr/local/lib/python3.7/site-packages/flink/lib

  2. configure the classpath in the python app

but still getting the same error

0 投票
1 回答
184 浏览

apache-flink - PyFlink 1.11.2 注册自定义 UDF 函数时无法配置 [taskmanager.memory.task.off-heap.size] 属性

我是 pyflink 的新手,并尝试使用 python API 注册自定义 UDF 函数。目前,我在服务器环境和本地 IDE 环境中都遇到了问题。

当我尝试执行下面的示例时,我收到一条错误消息:配置的任务堆外内存 0 字节小于最少需要的 Python 工作者内存 79 mb。任务堆外内存可以使用配置键'taskmanager.memory.task.off-heap.size

当然,我已经在我的flink-conf.yaml中添加了 required 属性,并检查了pyflink-shell.sh 是否使用指定的配置初始化了 env ,但这没有任何意义,我仍然有一个错误。

这是我正在尝试运行的代码示例:

谢谢你的帮助!

0 投票
2 回答
209 浏览

postgresql - pyflink JDBC Postgresql Catalog 对数据类型 UUID 抛出错误,如何处理 Flink Table API 中的 uuid 数据类型?

Apache Flink 1.11.0 Python 表 API 目录:postgresql

通过 Table API 从包含 UUID 数据类型列的 postgresql 目录表中读取和写入数据,抛出 UUID 数据类型 unsupportedOperatorException。

如何处理 pyFlink 中的 UUID 数据类型?

0 投票
1 回答
152 浏览

apache-flink - PyFlink - 在 JAR 中使用 Scala UDF 的问题

我正在尝试使用外部 JAR 在 Pyflink 中注册 Scala UDF,如下所示,但出现以下错误。

斯卡拉 UDF:

构建.sbt:

组装.sbt:

斯卡拉项目结构:

壳:

Python:

错误(包括重新启动 Anaconda 后):

版本:

知道问题可能是什么吗?

谢谢你的支持

0 投票
1 回答
320 浏览

apache-flink - PyFlink - UNNEST 问题:查询使用不受支持的 SQL 功能?

我正在尝试使用 Table API 中的 UNNEST 函数来展平数组。

我做错了什么还是它不是受支持的功能?这个页面虽然建议它:https ://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html

谢谢 !

代码

Python udf

加工

执行

错误

0 投票
1 回答
1372 浏览

apache-flink - PyFlink - Kafka - 缺少模块

我试图从 PyFlink 和 Kafka 开始,但得到以下错误。

谢谢你的支持 !

安装

代码

错误

0 投票
1 回答
424 浏览

apache-flink - PyFlink - JSON 文件接收器?

是否可以像使用 CSV 一样在 Table API 和/或 DataStream API 中使用 JSON 文件接收器?

谢谢 !

代码

错误

0 投票
1 回答
537 浏览

apache-flink - PyFlink - DataStream API - 缺少模块

我正在尝试从 DataStream API 开始,但缺少一个模块。

知道有什么问题吗?

版本

蟒蛇 3.7.9

代码

错误

0 投票
1 回答
343 浏览

apache-flink - PyFlink - Scala UDF - 如何在 Table API 中转换 Scala Map?

我正在尝试将Map[String,String]我的 Scala UDF ( scala.collection.immutable.map) 的对象输出映射到 Table API 中的某些有效数据类型,即通过java.util.Map此处推荐的 Java 类型 ():Flink Table API & SQL and map types (Scala)。但是我得到以下错误。

任何关于正确方法的想法?如果是,有没有办法将转换概括为类型的(嵌套)Scala 对象Map[String,Any]

代码

斯卡拉 UDF

下沉

错误

谢谢 !