问题标签 [pyflink]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - PyFlink - 指定表格格式并处理嵌套的 JSON 字符串数据
我有一个这样的 JSON 数据对象:
该字段data
本身就是一个 JSON 对象字符串。我如何用 Flink 的 Table API 来表达这个模式?我尝试创建一个接收 JSON 字符串并输出解析内容的 UDF。但是,我找不到填充DataTypes.ROW
对象的方法:
maven - python Flink安装找不到文件/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_*.jar
我正在关注这个来安装 Flink
安装那里提到的所有依赖项后。运行得非常好
我正在执行这个命令
我收到上述命令的此错误
我也在mvn install
顶级模块上做过。
我在哪里可以得到这个 jar 文件?
(编辑)
我错了,我在构建 MVN 时遇到了问题
这就是我的罐子没有创建的原因。
python - Apache-Flink 1.11 无法在 SQL 函数 DDL 中使用 Python UDF
根据这个合流页面:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
python udf 在 Flink 1.11 中可用于 SQL 函数。
我在这里访问了 flink 文档:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
并在终端上尝试此操作并使用以下参数启动sql-client.sh :
$ sql-client.sh embedded --pyExecutable /Users/jonathanfigueroa/opt/anaconda3/bin/python --pyFiles /Users/jonathanfigueroa/Desktop/pyflink/inference/test1.py
接着:
当我尝试时:
我尝试过使用:-pyarch,--pyArchives, -pyexec,--pyExecutable, -pyfs,--pyFiles
在每一个组合中.zip, .py
,结果都是一样的。
顺便说一句,我的 python 文件如下所示:
有什么我想念的吗?
亲切的问候,
乔纳森
rabbitmq - pyflink 1.11 的 RabbitMQ 自定义表源和接收器
根据这里的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
和
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
是否可以创建用于 pyflink TABLE API 1.11 的自定义 ddl RabbitMQ 连接器?
如何?
apache-flink - pyflink Table api中的Py4JJavaError
此代码将 pandas 转换为 flink table 进行转换,而不是再次转换回 pandas。filter
filter
当我使用than时它工作得很好,但是当我添加andselect
时给我一个错误。group_by
order_by
我的错误是
python - 不止一次在 Apache Flink 中计算时间属性
正如标题所说,我想在 flink 作业中多次进行涉及时间属性的计算。文档中提到,一旦在计算中使用了时间属性,它就会成为常规时间戳,并且无法再次用于计算。有什么方法可以两次使用 time 属性,还是我需要为此创建另一个 flink 作业?
python - 使用 Apache-flink 将处理后的流数据接收到数据库中
是否可以使用 pyflink 将处理后的流数据下沉到数据库中?所有写入处理数据的方法都仅限于将它们保存为 txt、csv 或 Json 格式,并且无法使用数据库接收数据。
python - Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF
在Flip-106
有一个如何通过 SQL 函数 DDL 在批处理作业 java 应用程序中调用用户定义的 python 函数的示例......
我一直在尝试在流式作业 java 应用程序中重现这个相同的示例,这是我的代码:
对于批处理作业中的这一特定行:
我还没有找到流媒体作业的等价物
1. 你能帮我把这个翻转 106 的例子从批处理映射到流吗?
我最终想要的是用 flink 1.11 在流式作业 java flink 应用程序中调用一个 python 函数,如下所示:
并使用该udf的结果进行进一步处理(不一定在控制台中打印)
我已经编辑了该test.py
文件,以查看是否至少不管未命名的表在 python 中是否正在执行某些操作。
并且没有打印任何内容,未创建 test.txt 文件,并且该值未返回到流式作业。所以基本上这个python函数没有被调用。
2.我在这里缺少什么?
感谢 David、Wei 和 Xingbo 迄今为止的支持,因为建议的每一个细节都对我有用。
此致,
乔纳森
java - Flink Python 自定义连接器/源
我想在 pyflink 中创建一个自定义的用户定义的连接器/源。我在 Java / Scala 中看到了这样做的文档,但在 Python 中没有看到。这可能吗?
python - 如何将 CSV 作为流表源加载到 PyFlink 中?
我正在尝试设置一个简单的游乐场环境来使用 Flink Python Table API。我最终尝试编写的作业将来自 Kafka 或 Kenesis 队列,但这使得玩弄想法(和测试)变得非常困难。
我可以愉快地从 CSV 加载并以批处理模式对其进行处理。但我无法让它在流媒体模式下工作。我将如何在 StreamingExecutionEnvironment 中做类似的事情(主要是为了让我可以玩 Windows)。
我知道我需要让系统使用 EventTime(因为 ProcTime 会同时出现),但无论如何我都找不到设置它。原则上,我应该能够将 CSV 的列之一设置为事件时间,但文档中不清楚如何执行此操作(或者如果可能的话)。
为了让批处理执行测试运行,我使用了下面的代码,它从一个读取input.csv
并输出到一个output.csv
.
和 input.csv 是
所以我的问题是如何设置它以便它从同一个 CSV 读取,但使用第一列作为事件时间并允许我编写如下代码:
任何帮助将不胜感激,我无法从文档中解决这个问题。我正在使用python 3.7
和flink 1.11.1
。