问题标签 [pyflink]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - PyFlink:调用已经关闭和 NullPointerException
我遇到了一个问题,即 PyFlink 作业最终可能会产生 3 个非常不同的结果,因为输入和运气的差异非常小:(
PyFlink 的工作很简单。它首先从 csv 文件中读取数据,然后使用利用sklearn.preprocessing.LabelEncoder
. 我已经在GitHub repo中包含了所有必要的复制文件。
重现:
conda env create -f environment.yaml
conda activate pyflink-issue-call-already-closed-env
pytest
验证ml_udf
工作正常中定义的 udfpython main.py
几次,你会看到多种结果
有3种可能的结果。
结果一:成功!
它以与结果 2 不同的顺序打印 90 行预期的行(见下文)。
结果2:通话已经结束
它首先打印 88 行预期的行,然后抛出异常抱怨java.lang.IllegalStateException: call already closed
。
这看起来类似于线程,但似乎问题已解决。
结果 3:NullPointerException
注意到奇怪之处后,我将 10 多行附加到users.csv
.
这次作业首先打印 88(再次 88 - 幻数!)预期的行,然后抛出 NullPointerException。
NullPointerException 让我想起了这个问题,但我通过了test_ml_udf.py
确保输入和输出类型的pandas.Series
长度相同。
为什么?
apache-flink - PyFlink UDF:何时使用矢量化与标量
在向量化和标量 PyFlink UDF 之间做出决定时,是否有一组简单的规则可以遵循?
根据文档,向量化 UDF 具有以下优点:(1)较小的 ser-de 和调用开销(2)向量(尤其是数字,如果我理解正确的话)计算得到了高度优化,这要归功于 Numpy 等库。
矢量化 Python 用户定义函数是通过在 JVM 和 Python VM 之间以箭头列格式传输一批元素来执行的函数。由于序列化/反序列化开销和调用开销大大降低,向量化 Python 用户定义函数的性能通常远高于非向量化 Python 用户定义函数。此外,用户可以利用流行的 Python 库,如 Pandas、Numpy 等来实现矢量化 Python 用户定义函数。这些 Python 库经过高度优化并提供高性能的数据结构和函数。
问题 1:矢量化 UDF 是否总是首选?
假设,在我的用例中,我想简单地从 JSON 列中提取一些字段,但 Flink内置函数尚不支持,因此我需要定义我的 udf,如:
问题 2:在这种非数字情况下,我是否也会从矢量化 UDF 中受益?
apache-flink - Flink:无法将流下沉到 csv
我正在尝试使用 PyFlink 将流放入 csv 格式的文件系统中,但是它不起作用。
要运行脚本:
我希望记录转到 /tmp/output 文件夹,但是这不会发生。
有什么我想念的吗?
apache-kafka - 在pyflink中访问kafka时间戳
我正在尝试编写一个 Pyflink 应用程序来测量延迟和吞吐量。我的数据来自 kafka 主题的 json 对象,并DataStream
使用SimpleStringSchema
-class 加载到反序列化中。按照这篇文章的答案(如何在 Kafka 和 Flink 环境中测试性能?)我让 Kafka 生产者在事件中放置时间戳,但现在很难理解如何访问这些时间戳。我知道上面提到的帖子为这个问题提供了一个解决方案,但我正在努力将这个示例转移到 python,因为文档/示例很少。
这另一篇文章(Apache Flink:如何在摄取时间模式下获取事件的时间戳?)建议我应该定义一个ProcessFunction
。但是,在这里我也不确定语法。我可能不得不做这样的事情(取自:https ://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job .py )
在这里做的正确方法是什么value.get_time_stamp()
?或者是否有一种更简单的方法可以解决我不知道的问题?
谢谢!
apache-kafka - Pyflink 连接器卡夫卡
我是 Flink 和 kafka 等这些框架的初学者。我想做的是一个以kafka和Flink为基础设施核心的生产者和消费者。信息由 kafka 插入到主题中进行管理,然后从 flink(python so pyflink) 管理的信息中获取它们,然后将修改后的信息发送到使用消费者 kakfa 获取的不同 kafka 主题。我在这个基础设施中错过的是 pyflink 与 kafka 的设置。我怎样才能建立这个链接?我在网上搜索指南,但找到的代码不起作用。有人可以帮我打磨一下吗?
下面我写了我的kafka producer.py
下面我写了我的consumerKafka.py
如何使用 kafka 开发 pyflink 连接器,以便使用来自 batch_stream 主题的信息,然后操作并将信息刷新到 output_batch 中?
我尝试像上面那样进行开发,但存在一些问题。
python - Pyflink get_gateway() 方法不起作用
我正在使用 PyCharm 运行简单的 pyflink 最新 1.13 示例(它只有两行)。但该get_gateway()
方法不起作用。
对于解释器,我创建了虚拟环境并在我的 PyCharm 项目中使用它。我还下载了Java8。我尝试了很多步骤来解决这个问题,但是
apache-flink - Flink Python API 是否支持计量指标?
我正在使用 PyFlink 进行流处理,并添加了一些指标来监控性能。
这是我用指标注册 udf 的代码。我已经安装了 apache-flink 1.13.0。
日志中没有引发异常,一旦我将类型更改self.gauge_value
为str,它就会出错,所以我相信该行已执行并注册了仪表。
但是,我永远无法在 Web UI 的指标选项卡中找到仪表。我尝试在此测试 udf 中添加计数器和计量器,它们都正确显示在指标选项卡中。
PyFlink 支持仪表吗?如果没有,还有其他选择吗?
python - PyFlink - 如何使用 PyFlink 将数据推送到 mongodb 和 redis?
我是 PyFlink 的新手。
最近,我使用 PyFlink 完成了一个从 Kafka 读取流数据并将其插入另一个 Kafka 的功能。
现在,我想将数据推送到 mongodb 和 redis 中。但是我阅读了文档并在搜索引擎上搜索了这个问题,但没有得到任何有用的解决方案。
在文档中,有一些与Kafka,文件系统等的连接器。似乎没有针对mongodb和redis的解决方案。
那么,如何使用 PyFlink 将数据推送到 mongodb 和 redis 呢?
首先感谢您的帮助!
json - pyflink kafka 连接器将接收到的 json 数据反序列化为 null
我正在使用 PyFlink 创建一个流处理器。当我将 Kafka 连接到 Flink 时,一切正常。但是当我将 json 数据发送到 kafka 时,PyFlink 会接收到它,但反序列化器会将其转换为 null。PyFlink 代码是
卡夫卡生产者代码是
请让我知道如何在 PyFlink 中接收 json 数据
apache-flink - Python UDF 引发 py4j 异常的 Pyflink 设置有什么问题?
我正在使用文档中的 flink python 数据流教程:https ://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/datastream_tutorial/
环境
我的环境在 Windows 10 上。java -version
给出:
我尝试了几种方法来为 flink 设置 python 环境。结果是一样的。
选项 1(使用 conda):
选项2(使用系统python +诗歌):
- 在系统级别安装 python 3.8.10
- pyproject.toml 依赖项:
- 安装依赖项:
问题
最小的例子:
如果我从文档中运行示例,它就可以工作。一旦我使用 python udf 添加“map”操作,我就会收到 py4j 错误。这就是我得到的:
我试图找出 py4j 本身是否有效。但确实如此。这给我打印了两个数字,所以它似乎工作:
非常感谢任何要检查或测试的提示。