问题标签 [pyflink]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - 使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段
我正在尝试使用 Apache Flink 1.11 创建一个源表,我可以在其中访问 JSON 消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。
文档建议它应该是一种类型,MAP
但是当我设置它时,我收到以下错误
这是我的 SQL
我的 JSON 看起来像这样:
mysql - Required context properties mismatch in connecting the flink with mysql database
I am using flink latest (1.11.2) to work with a sample mysql database, which the database is working fine.
Additionally, i have added the flink-connector-jdbc_2.11-1.11.2, mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib
Here is my code
However when running the code, it come up with error saying
latest:
this is my docker yml file.
docker ps commands show out
more info:
my current flink environment in docker is flink:scala_2.12-java8
pyflink jdbc connector is flink-connector-jdbc_2.11-1.11.2.jar from flink 1.11 version.
in order to use the jdbc library, I tried two ways
save the flink-connector-jdbc_2.11-1.11.2.jar into /usr/local/lib/python3.7/site-packages/flink/lib
configure the classpath in the python app
but still getting the same error
apache-flink - PyFlink 1.11.2 注册自定义 UDF 函数时无法配置 [taskmanager.memory.task.off-heap.size] 属性
我是 pyflink 的新手,并尝试使用 python API 注册自定义 UDF 函数。目前,我在服务器环境和本地 IDE 环境中都遇到了问题。
当我尝试执行下面的示例时,我收到一条错误消息:配置的任务堆外内存 0 字节小于最少需要的 Python 工作者内存 79 mb。任务堆外内存可以使用配置键'taskmanager.memory.task.off-heap.size
当然,我已经在我的flink-conf.yaml中添加了 required 属性,并检查了pyflink-shell.sh 是否使用指定的配置初始化了 env ,但这没有任何意义,我仍然有一个错误。
这是我正在尝试运行的代码示例:
谢谢你的帮助!
postgresql - pyflink JDBC Postgresql Catalog 对数据类型 UUID 抛出错误,如何处理 Flink Table API 中的 uuid 数据类型?
Apache Flink 1.11.0 Python 表 API 目录:postgresql
通过 Table API 从包含 UUID 数据类型列的 postgresql 目录表中读取和写入数据,抛出 UUID 数据类型 unsupportedOperatorException。
如何处理 pyFlink 中的 UUID 数据类型?
apache-flink - PyFlink - 在 JAR 中使用 Scala UDF 的问题
我正在尝试使用外部 JAR 在 Pyflink 中注册 Scala UDF,如下所示,但出现以下错误。
斯卡拉 UDF:
构建.sbt:
组装.sbt:
斯卡拉项目结构:
壳:
Python:
错误(包括重新启动 Anaconda 后):
版本:
知道问题可能是什么吗?
谢谢你的支持
apache-flink - PyFlink - UNNEST 问题:查询使用不受支持的 SQL 功能?
我正在尝试使用 Table API 中的 UNNEST 函数来展平数组。
我做错了什么还是它不是受支持的功能?这个页面虽然建议它:https ://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html
谢谢 !
代码
Python udf
加工
执行
错误
apache-flink - PyFlink - Kafka - 缺少模块
我试图从 PyFlink 和 Kafka 开始,但得到以下错误。
谢谢你的支持 !
安装
代码
错误
apache-flink - PyFlink - JSON 文件接收器?
是否可以像使用 CSV 一样在 Table API 和/或 DataStream API 中使用 JSON 文件接收器?
谢谢 !
代码
错误
apache-flink - PyFlink - DataStream API - 缺少模块
我正在尝试从 DataStream API 开始,但缺少一个模块。
知道有什么问题吗?
版本
蟒蛇 3.7.9
代码
错误
apache-flink - PyFlink - Scala UDF - 如何在 Table API 中转换 Scala Map?
我正在尝试将Map[String,String]
我的 Scala UDF ( scala.collection.immutable.map
) 的对象输出映射到 Table API 中的某些有效数据类型,即通过java.util.Map
此处推荐的 Java 类型 ():Flink Table API & SQL and map types (Scala)。但是我得到以下错误。
任何关于正确方法的想法?如果是,有没有办法将转换概括为类型的(嵌套)Scala 对象Map[String,Any]
?
代码
斯卡拉 UDF
下沉
错误
谢谢 !