问题标签 [pyflink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
607 浏览

apache-flink - PyFlink - 指定表格格式并处理嵌套的 JSON 字符串数据

我有一个这样的 JSON 数据对象:

该字段data本身就是一个 JSON 对象字符串。我如何用 Flink 的 Table API 来表达这个模式?我尝试创建一个接收 JSON 字符串并输出解析内容的 UDF。但是,我找不到填充DataTypes.ROW对象的方法:

0 投票
1 回答
283 浏览

maven - python Flink安装找不到文件/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_*.jar

我正在关注这个来安装 Flink

安装那里提到的所有依赖项后。运行得非常好

我正在执行这个命令

我收到上述命令的此错误

我也在mvn install顶级模块上做过。

我在哪里可以得到这个 jar 文件?

(编辑)

我错了,我在构建 MVN 时遇到了问题

这就是我的罐子没有创建的原因。

0 投票
1 回答
605 浏览

python - Apache-Flink 1.11 无法在 SQL 函数 DDL 中使用 Python UDF

根据这个合流页面:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL

python udf 在 Flink 1.11 中可用于 SQL 函数。

我在这里访问了 flink 文档:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html

并在终端上尝试此操作并使用以下参数启动sql-client.sh :

$ sql-client.sh embedded --pyExecutable /Users/jonathanfigueroa/opt/anaconda3/bin/python --pyFiles /Users/jonathanfigueroa/Desktop/pyflink/inference/test1.py

接着:

当我尝试时:

我尝试过使用:-pyarch,--pyArchives, -pyexec,--pyExecutable, -pyfs,--pyFiles在每一个组合中.zip, .py,结果都是一样的。

顺便说一句,我的 python 文件如下所示:

有什么我想念的吗?

亲切的问候,

乔纳森

0 投票
1 回答
217 浏览

rabbitmq - pyflink 1.11 的 RabbitMQ 自定义表源和接收器

根据这里的文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

是否可以创建用于 pyflink TABLE API 1.11 的自定义 ddl RabbitMQ 连接器?

如何?

0 投票
1 回答
302 浏览

apache-flink - pyflink Table api中的Py4JJavaError

此代码将 pandas 转换为 flink table 进行转换,而不是再次转换回 pandas。filter filter当我使用than时它工作得很好,但是当我添加andselect时给我一个错误。group_byorder_by

我的错误是

0 投票
0 回答
20 浏览

python - 不止一次在 Apache Flink 中计算时间属性

正如标题所说,我想在 flink 作业中多次进行涉及时间属性的计算。文档中提到,一旦在计算中使用了时间属性,它就会成为常规时间戳,并且无法再次用于计算。有什么方法可以两次使用 time 属性,还是我需要为此创建另一个 flink 作业?

0 投票
1 回答
658 浏览

python - 使用 Apache-flink 将处理后的流数据接收到数据库中

是否可以使用 pyflink 将处理后的流数据下沉到数据库中?所有写入处理数据的方法都仅限于将它们保存为 txt、csv 或 Json 格式,并且无法使用数据库接收数据。

0 投票
1 回答
304 浏览

python - Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF

Flip-106有一个如何通过 SQL 函数 DDL 在批处理作业 java 应用程序中调用用户定义的 python 函数的示例......

我一直在尝试在流式作业 java 应用程序中重现这个相同的示例,这是我的代码:

对于批处理作业中的这一特定行:

我还没有找到流媒体作业的等价物

1. 你能帮我把这个翻转 106 的例子从批处理映射到流吗?

我最终想要的是用 flink 1.11 在流式作业 java flink 应用程序中调用一个 python 函数,如下所示:

并使用该udf的结果进行进一步处理(不一定在控制台中打印)

我已经编辑了该test.py文件,以查看是否至少不管未命名的表在 python 中是否正在执行某些操作。

并且没有打印任何内容,未创建 test.txt 文件,并且该值未返回到流式作业。所以基本上这个python函数没有被调用。

2.我在这里缺少什么?

感谢 David、Wei 和 Xingbo 迄今为止的支持,因为建议的每一个细节都对我有用。

此致,

乔纳森

0 投票
1 回答
218 浏览

java - Flink Python 自定义连接器/源

我想在 pyflink 中创建一个自定义的用户定义的连接器/源。我在 Java / Scala 中看到了这样做的文档,但在 Python 中没有看到。这可能吗?

0 投票
2 回答
743 浏览

python - 如何将 CSV 作为流表源加载到 PyFlink 中?

我正在尝试设置一个简单的游乐场环境来使用 Flink Python Table API。我最终尝试编写的作业将来自 Kafka 或 Kenesis 队列,但这使得玩弄想法(和测试)变得非常困难。

我可以愉快地从 CSV 加载并以批处理模式对其进行处理。但我无法让它在流媒体模式下工作。我将如何在 StreamingExecutionEnvironment 中做类似的事情(主要是为了让我可以玩 Windows)。

我知道我需要让系统使用 EventTime(因为 ProcTime 会同时出现),但无论如何我都找不到设置它。原则上,我应该能够将 CSV 的列之一设置为事件时间,但文档中不清楚如何执行此操作(或者如果可能的话)。

为了让批处理执行测试运行,我使用了下面的代码,它从一个读取input.csv并输出到一个output.csv.

和 input.csv 是

所以我的问题是如何设置它以便它从同一个 CSV 读取,但使用第一列作为事件时间并允许我编写如下代码:

任何帮助将不胜感激,我无法从文档中解决这个问题。我正在使用python 3.7flink 1.11.1