问题标签 [pyflink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
573 浏览

apache-flink - 得到“pyflink.util.exceptions.TableException:findAndCreateTableSource 失败。” 运行 PyFlink 示例时

我在 PyFlink 程序下运行(从https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html复制)

为了验证它是否有效,我按顺序执行了以下操作:

  1. echo -e "flink\npyflink\nflink" > /tmp/input
  2. python WordCount.py
  3. 运行cat /tmp/out并找到预期的输出

然后我稍微改变了我的 PyFlink 程序,使其更喜欢 SQL 而不是 Table API,但我发现它不起作用。

这是错误:

我想知道我的新程序有什么问题?

0 投票
1 回答
186 浏览

apache-flink - 为什么 Flink FileSystem sink 拆分成多个文件

我想使用 Flink 从输入文件中读取,进行一些聚合,然后将结果写入输出文件。作业处于批处理模式。见wordcount.py下文:

在运行之前python wordcount.py,我运行echo -e "flink\npyflink\nflink" > /tmp/input以确保 /tmp/input 中存在数据。但是,运行后,/tmp/output 中有两个文件:

虽然我希望单个文件 /tmp/output 包含内容:

实际上,我通过调整以下生成单个文件 /tmp/output.

运行此版本将生成 /tmp/output。请注意,它不带有逗号分隔符。

知道为什么吗?谢谢!

0 投票
1 回答
297 浏览

apache-flink - PyFlink java.io.EOFException 在 java.io.DataInputStream.readFully

我有一个 PyFlink 作业,它从文件中读取,根据条件过滤并打印。这是tree我的工作目录的视图。这是 PyFlink 脚本 main.py:

运行main.py

  • 确保在我的 conda env 中安装 pyflink==1.12.0
  • /tmp/input 有单行内容{"id":1,"tag":"a"}

然后我跑main.py了,我得到了异常:

该问题可能与udf有关。有什么帮助吗?谢谢!

0 投票
1 回答
230 浏览

apache-flink - PyFlink 数据流 API 支持窗口化

Apache Flink 的 Python SDK ( PyFlink ) Datastream API 是否支持 Windowing 等运算符?到目前为止,我看到的任何使用 PyFlink 进行窗口化的示例都使用 Table API。Datastream API确实支持这些运算符,但看起来这些还不能通过 PyFlink 获得?

谢谢!

0 投票
1 回答
141 浏览

apache-flink - 无法对 PyFlink UDF 进行单元测试

我正在使用 PyFlink,我想对我用 Python 编写的 UDF 进行单元测试。

要测试下面的简单 udf:

我创建了一个应该失败的测试文件:

可悲的是,如果我运行它就会通过pytest

@udf(input_types=[...], result_type=...)但是,如果我从 udf.py中删除注释,测试将按预期失败。

结果:

完整示例可以在https://github.com/YikSanChan/how-to-pytest-flink找到。

0 投票
0 回答
356 浏览

apache-flink - PyFlink 从 JSON 数组中提取嵌套字段

我正在尝试从从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每条记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0].

我正在使用 Table API 从 Kafka 主题中读取数据并将提取的字段写入另一个主题。

源DDL如下:

对应的 sink DDL 为:

value这是从数组的第一个元素中提取字段的代码片段:

execute_insert当我执行此步骤时,我在步骤中看到以下错误。

但是,如果我不提取嵌入的value,而是提取数组的整行,即table.Result.data.at(1)sink_ddl适当地修改,我就能够正确地获取整行。

任何想法,我错过了什么?感谢您的任何指点!

编辑:这可能是 Flink 中的一个错误,它被https://issues.apache.org/jira/browse/FLINK-22082跟踪。

0 投票
0 回答
313 浏览

hive - 来自容器启动 exitCode=2 的异常,可能与 Flink SQL Hive 连接器有关

Flink (Scala) exitCode=2 之后mvn clean package

我有一个简单的 Flink 作业,它从 Hive 表的 2 列中读取mysource,将列相加,然后将结果写入另一个 Hive 表mysink,该表mysource有 2 列a bigintb bigint,并且mysink只有 1 列c bigint

作业提交成功,但是,我观察到它一直在重试。

在此处输入图像描述

我点击每一次尝试,他们只是展示了这一点。

但是,“日志”没有有用的信息——它抱怨日志库,但我相信它们确实是警告,而不是错误。

这是用 Scala 编写的工作。

这是 pom.xml。

这就是我打包jar的方式。

这就是我开展工作的方式。

通过 IntelliJ 的 Flink(Scala) 工作得很好?!

由于我的本地环境能够访问上述 Hive 环境,我尝试HiveToyExample通过单击“运行”按钮在 IntelliJ 中运行......它工作得很好!

PyFlink 重写工作得很好?!

由于逻辑如此简单,我用 PyFlink 重写了作业,看看会发生什么。这里显示了 PyFlink 重写。

这就是我运行 PyFlink 作业的方式。

令人惊讶的是,这项工作运行良好 - 它很快完成,结果写入mysink表格。

为什么?

鉴于比较,我非常怀疑第一次运行是否失败,因为它打包不正确,即使我遵循Flink Docs,这可以通过查看我的 pom 来验证。

如果您正在构建自己的程序,则您的 mvn 文件中需要以下依赖项。建议不要在生成的 jar 文件中包含这些依赖项。您应该在运行时添加上述依赖项。

此外,按照Flink 文档中的建议,我在我的 flink 发行版的 /lib 中包含了 flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar :

添加依赖项的推荐方法是使用捆绑的 jar。仅当捆绑的 jar 不能满足您的需要时,才应使用单独的 jar。

我想念什么?

0 投票
2 回答
285 浏览

apache-flink - Python Flink 连接远程 Flink 环境

我有在远程系统中运行的 flink 系统。说 IP 为 10.XX.XX.XX,端口为 6123。现在我想使用 RemoteExecution Environment 使用 Pyflink 从另一个系统连接。我看到了文档https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/table_environment.html但不清楚。请问有什么指点吗?

0 投票
1 回答
246 浏览

apache-flink - 将 func_type='pandas' 添加到 PyFlink UDF 会引发 ArrowTypeError('Did not pass numpy.dtype object'))

我有一个从 csv 文件(在 path 中)读取的 PyFlink 作业,data.txt对前 2 个整数列求和,然后打印结果。

这是 data.txt 文件。

batch-prediction.py这是运行 PyFlink 作业的文件(名为)。

运行python batch-prediction.py正常。

根据文档

用户只需要在装饰器中增加一个额外的参数func_type="pandas"。

但是,func_type="pandas"在 udf 中添加后,再运行python batch-prediction.py一次,就会抛出异常。

库版本:

  • apache-flink==1.12.0
  • pandas==0.25.3(通过 apache-flink 安装)
  • pyarrow==0.17.1

我想知道为什么?

0 投票
1 回答
118 浏览

apache-flink - PyFlink 矢量化 UDF 抛出 NullPointerException

我有一个机器学习模型,它需要两个 numpy.ndarray -usersitems- 并返回一个 numpy.ndarray predictions。在普通的 Python 代码中,我会这样做:

我正在考虑将此代码移植到 Flink 以利用其分布式特性。我的假设是:通过将预测工作负载分布在多个 Flink 节点上,我应该能够更快地运行整个预测。

所以我编写了一个 PyFlink 作业。注意我实现了一个调用predict来运行预测的 UDF。

这是UDF。

工作运行良好。但是,预测实际上是在表的每一行上运行的source,这并不高效。相反,我想将 80,000 个 (user_id, movie_id) 对拆分为 100 个批次,每个批次有 800 行。该作业触发model(users, items)函数 100 次(= 批次数),其中usersitems都有 800 个元素。

我找不到办法做到这一点。通过查看文档,矢量化的用户定义函数可能会起作用。

不幸的是,事实并非如此。

错误消息不是很有帮助。任何人都可以帮忙吗?谢谢!

注意:源代码可以在这里找到。要运行代码,您需要在本地使用 Anaconda,然后: