问题标签 [pyflink]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - 得到“pyflink.util.exceptions.TableException:findAndCreateTableSource 失败。” 运行 PyFlink 示例时
我在 PyFlink 程序下运行(从https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html复制)
为了验证它是否有效,我按顺序执行了以下操作:
- 跑
echo -e "flink\npyflink\nflink" > /tmp/input
- 跑
python WordCount.py
- 运行
cat /tmp/out
并找到预期的输出
然后我稍微改变了我的 PyFlink 程序,使其更喜欢 SQL 而不是 Table API,但我发现它不起作用。
这是错误:
我想知道我的新程序有什么问题?
apache-flink - 为什么 Flink FileSystem sink 拆分成多个文件
我想使用 Flink 从输入文件中读取,进行一些聚合,然后将结果写入输出文件。作业处于批处理模式。见wordcount.py
下文:
在运行之前python wordcount.py
,我运行echo -e "flink\npyflink\nflink" > /tmp/input
以确保 /tmp/input 中存在数据。但是,运行后,/tmp/output 中有两个文件:
虽然我希望单个文件 /tmp/output 包含内容:
实际上,我通过调整以下生成单个文件 /tmp/output.
运行此版本将生成 /tmp/output。请注意,它不带有逗号分隔符。
知道为什么吗?谢谢!
apache-flink - PyFlink java.io.EOFException 在 java.io.DataInputStream.readFully
我有一个 PyFlink 作业,它从文件中读取,根据条件过滤并打印。这是tree
我的工作目录的视图。这是 PyFlink 脚本 main.py:
运行main.py
:
- 确保在我的 conda env 中安装 pyflink==1.12.0
- /tmp/input 有单行内容
{"id":1,"tag":"a"}
然后我跑main.py
了,我得到了异常:
该问题可能与udf有关。有什么帮助吗?谢谢!
apache-flink - PyFlink 数据流 API 支持窗口化
Apache Flink 的 Python SDK ( PyFlink ) Datastream API 是否支持 Windowing 等运算符?到目前为止,我看到的任何使用 PyFlink 进行窗口化的示例都使用 Table API。Datastream API确实支持这些运算符,但看起来这些还不能通过 PyFlink 获得?
谢谢!
apache-flink - 无法对 PyFlink UDF 进行单元测试
我正在使用 PyFlink,我想对我用 Python 编写的 UDF 进行单元测试。
要测试下面的简单 udf:
我创建了一个应该失败的测试文件:
可悲的是,如果我运行它就会通过pytest
:
@udf(input_types=[...], result_type=...)
但是,如果我从 udf.py中删除注释,测试将按预期失败。
结果:
apache-flink - PyFlink 从 JSON 数组中提取嵌套字段
我正在尝试从从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每条记录都有一个Result
对象,其中有一个名为 的对象数组data
。我正在尝试value
从第一个数组元素中提取字段,即data[0]
.
我正在使用 Table API 从 Kafka 主题中读取数据并将提取的字段写入另一个主题。
源DDL如下:
对应的 sink DDL 为:
value
这是从数组的第一个元素中提取字段的代码片段:
execute_insert
当我执行此步骤时,我在步骤中看到以下错误。
但是,如果我不提取嵌入的value
,而是提取数组的整行,即table.Result.data.at(1)
并sink_ddl
适当地修改,我就能够正确地获取整行。
任何想法,我错过了什么?感谢您的任何指点!
编辑:这可能是 Flink 中的一个错误,它被https://issues.apache.org/jira/browse/FLINK-22082跟踪。
hive - 来自容器启动 exitCode=2 的异常,可能与 Flink SQL Hive 连接器有关
Flink (Scala) exitCode=2 之后mvn clean package
我有一个简单的 Flink 作业,它从 Hive 表的 2 列中读取mysource
,将列相加,然后将结果写入另一个 Hive 表mysink
,该表mysource
有 2 列a bigint
和b bigint
,并且mysink
只有 1 列c bigint
。
作业提交成功,但是,我观察到它一直在重试。
我点击每一次尝试,他们只是展示了这一点。
但是,“日志”没有有用的信息——它抱怨日志库,但我相信它们确实是警告,而不是错误。
这是用 Scala 编写的工作。
这是 pom.xml。
这就是我打包jar的方式。
这就是我开展工作的方式。
通过 IntelliJ 的 Flink(Scala) 工作得很好?!
由于我的本地环境能够访问上述 Hive 环境,我尝试HiveToyExample
通过单击“运行”按钮在 IntelliJ 中运行......它工作得很好!
PyFlink 重写工作得很好?!
由于逻辑如此简单,我用 PyFlink 重写了作业,看看会发生什么。这里显示了 PyFlink 重写。
这就是我运行 PyFlink 作业的方式。
令人惊讶的是,这项工作运行良好 - 它很快完成,结果写入mysink
表格。
为什么?
鉴于比较,我非常怀疑第一次运行是否失败,因为它打包不正确,即使我遵循Flink Docs,这可以通过查看我的 pom 来验证。
如果您正在构建自己的程序,则您的 mvn 文件中需要以下依赖项。建议不要在生成的 jar 文件中包含这些依赖项。您应该在运行时添加上述依赖项。
此外,按照Flink 文档中的建议,我在我的 flink 发行版的 /lib 中包含了 flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar :
添加依赖项的推荐方法是使用捆绑的 jar。仅当捆绑的 jar 不能满足您的需要时,才应使用单独的 jar。
我想念什么?
apache-flink - Python Flink 连接远程 Flink 环境
我有在远程系统中运行的 flink 系统。说 IP 为 10.XX.XX.XX,端口为 6123。现在我想使用 RemoteExecution Environment 使用 Pyflink 从另一个系统连接。我看到了文档https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/table_environment.html但不清楚。请问有什么指点吗?
apache-flink - 将 func_type='pandas' 添加到 PyFlink UDF 会引发 ArrowTypeError('Did not pass numpy.dtype object'))
我有一个从 csv 文件(在 path 中)读取的 PyFlink 作业,data.txt
对前 2 个整数列求和,然后打印结果。
这是 data.txt 文件。
batch-prediction.py
这是运行 PyFlink 作业的文件(名为)。
运行python batch-prediction.py
正常。
根据文档:
用户只需要在装饰器中增加一个额外的参数func_type="pandas"。
但是,func_type="pandas"
在 udf 中添加后,再运行python batch-prediction.py
一次,就会抛出异常。
库版本:
- apache-flink==1.12.0
- pandas==0.25.3(通过 apache-flink 安装)
- pyarrow==0.17.1
我想知道为什么?
apache-flink - PyFlink 矢量化 UDF 抛出 NullPointerException
我有一个机器学习模型,它需要两个 numpy.ndarray -users
和items
- 并返回一个 numpy.ndarray predictions
。在普通的 Python 代码中,我会这样做:
我正在考虑将此代码移植到 Flink 以利用其分布式特性。我的假设是:通过将预测工作负载分布在多个 Flink 节点上,我应该能够更快地运行整个预测。
所以我编写了一个 PyFlink 作业。注意我实现了一个调用predict
来运行预测的 UDF。
这是UDF。
工作运行良好。但是,预测实际上是在表的每一行上运行的source
,这并不高效。相反,我想将 80,000 个 (user_id, movie_id) 对拆分为 100 个批次,每个批次有 800 行。该作业触发model(users, items)
函数 100 次(= 批次数),其中users
和items
都有 800 个元素。
我找不到办法做到这一点。通过查看文档,矢量化的用户定义函数可能会起作用。
不幸的是,事实并非如此。
错误消息不是很有帮助。任何人都可以帮忙吗?谢谢!
注意:源代码可以在这里找到。要运行代码,您需要在本地使用 Anaconda,然后: