问题标签 [pyflink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
293 浏览

apache-flink - PyFlink:调用已经关闭和 NullPointerException

我遇到了一个问题,即 PyFlink 作业最终可能会产生 3 个非常不同的结果,因为输入和运气的差异非常小:(

PyFlink 的工作很简单。它首先从 csv 文件中读取数据,然后使用利用sklearn.preprocessing.LabelEncoder. 我已经在GitHub repo中包含了所有必要的复制文件。

重现:

  • conda env create -f environment.yaml
  • conda activate pyflink-issue-call-already-closed-env
  • pytest验证ml_udf工作正常中定义的 udf
  • python main.py几次,你会看到多种结果

有3种可能的结果。

结果一:成功!

它以与结果 2 不同的顺序打印 90 行预期的行(见下文)。

结果2:通话已经结束

它首先打印 88 行预期的行,然后抛出异常抱怨java.lang.IllegalStateException: call already closed

这看起来类似于线程,但似乎问题已解决。

结果 3:NullPointerException

注意到奇怪之处后,我将 10 多行附加到users.csv.

这次作业首先打印 88(再次 88 - 幻数!)预期的行,然后抛出 NullPointerException。

NullPointerException 让我想起了这个问题,但我通过了test_ml_udf.py确保输入和输出类型的pandas.Series长度相同。

为什么?

0 投票
0 回答
43 浏览

apache-flink - PyFlink UDF:何时使用矢量化与标量

在向量化和标量 PyFlink UDF 之间做出决定时,是否有一组简单的规则可以遵循?

根据文档,向量化 UDF 具有以下优点:(1)较小的 ser-de 和调用开销(2)向量(尤其是数字,如果我理解正确的话)计算得到了高度优化,这要归功于 Numpy 等库。

矢量化 Python 用户定义函数是通过在 JVM 和 Python VM 之间以箭头列格式传输一批元素来执行的函数。由于序列化/反序列化开销和调用开销大​​大降低,向量化 Python 用户定义函数的性能通常远高于非向量化 Python 用户定义函数。此外,用户可以利用流行的 Python 库,如 Pandas、Numpy 等来实现矢量化 Python 用户定义函数。这些 Python 库经过高度优化并提供高性能的数据结构和函数。

问题 1:矢量化 UDF 是否总是首选?

假设,在我的用例中,我想简单地从 JSON 列中提取一些字段,但 Flink内置函数尚不支持,因此我需要定义我的 udf,如:

问题 2:在这种非数字情况下,我是否也会从矢量化 UDF 中受益?

0 投票
1 回答
234 浏览

apache-flink - Flink:无法将流下沉到 csv

我正在尝试使用 PyFlink 将流放入 csv 格式的文件系统中,但是它不起作用。

要运行脚本:

我希望记录转到 /tmp/output 文件夹,但是这不会发生。

有什么我想念的吗?

0 投票
1 回答
270 浏览

apache-kafka - 在pyflink中访问kafka时间戳

我正在尝试编写一个 Pyflink 应用程序来测量延迟和吞吐量。我的数据来自 kafka 主题的 json 对象,并DataStream使用SimpleStringSchema-class 加载到反序列化中。按照这篇文章的答案(如何在 Kafka 和 Flink 环境中测试性能?)我让 Kafka 生产者在事件中放置时间戳,但现在很难理解如何访问这些时间戳。我知道上面提到的帖子为这个问题提供了一个解决方案,但我正在努力将这个示例转移到 python,因为文档/示例很少。

这另一篇文章(Apache Flink:如何在摄取时间模式下获取事件的时间戳?)建议我应该定义一个ProcessFunction。但是,在这里我也不确定语法。我可能不得不做这样的事情(取自:https ://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job .py )

在这里做的正确方法是什么value.get_time_stamp()?或者是否有一种更简单的方法可以解决我不知道的问题?

谢谢!

0 投票
0 回答
435 浏览

apache-kafka - Pyflink 连接器卡夫卡

我是 Flink 和 kafka 等这些框架的初学者。我想做的是一个以kafka和Flink为基础设施核心的生产者和消费者。信息由 kafka 插入到主题中进行管理,然后从 flink(python so pyflink) 管理的信息中获取它们,然后将修改后的信息发送到使用消费者 kakfa 获取的不同 kafka 主题。我在这个基础设施中错过的是 pyflink 与 kafka 的设置。我怎样才能建立这个链接?我在网上搜索指南,但找到的代码不起作用。有人可以帮我打磨一下吗?

下面我写了我的kafka producer.py

下面我写了我的consumerKafka.py

如何使用 kafka 开发 pyflink 连接器,以便使用来自 batch_stream 主题的信息,然后操作并将信息刷新到 output_batch 中?

我尝试像上面那样进行开发,但存在一些问题。

0 投票
0 回答
38 浏览

python - Pyflink get_gateway() 方法不起作用

我正在使用 PyCharm 运行简单的 pyflink 最新 1.13 示例(它只有两行)。但该get_gateway()方法不起作用。

对于解释器,我创建了虚拟环境并在我的 PyCharm 项目中使用它。我还下载了Java8。我尝试了很多步骤来解决这个问题,但是

0 投票
0 回答
85 浏览

apache-flink - Flink Python API 是否支持计量指标?

我正在使用 PyFlink 进行流处理,并添加了一些指标来监控性能。

这是我用指标注册 udf 的代码。我已经安装了 apache-flink 1.13.0。

日志中没有引发异常,一旦我将类型更改self.gauge_value为str,它就会出错,所以我相信该行已执行并注册了仪表。

但是,我永远无法在 Web UI 的指标选项卡中找到仪表。我尝试在此测试 udf 中添加计数器和计量器,它们都正确显示在指标选项卡中。

PyFlink 支持仪表吗?如果没有,还有其他选择吗?

0 投票
1 回答
318 浏览

python - PyFlink - 如何使用 PyFlink 将数据推送到 mongodb 和 redis?

我是 PyFlink 的新手。

最近,我使用 PyFlink 完成了一个从 Kafka 读取流数据并将其插入另一个 Kafka 的功能。

现在,我想将数据推送到 mongodb 和 redis 中。但是我阅读了文档并在搜索引擎上搜索了这个问题,但没有得到任何有用的解决方案。

在文档中,有一些与Kafka,文件系统等的连接器。似乎没有针对mongodb和redis的解决方案。

那么,如何使用 PyFlink 将数据推送到 mongodb 和 redis 呢?

首先感谢您的帮助!

0 投票
2 回答
362 浏览

json - pyflink kafka 连接器将接收到的 json 数据反序列化为 null

我正在使用 PyFlink 创建一个流处理器。当我将 Kafka 连接到 Flink 时,一切正常。但是当我将 json 数据发送到 kafka 时,PyFlink 会接收到它,但反序列化器会将其转换为 null。PyFlink 代码是

卡夫卡生产者代码是

请让我知道如何在 PyFlink 中接收 json 数据

0 投票
1 回答
545 浏览

apache-flink - Python UDF 引发 py4j 异常的 Pyflink 设置有什么问题?

我正在使用文档中的 flink python 数据流教程:https ://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/datastream_tutorial/

环境

我的环境在 Windows 10 上。java -version给出:

我尝试了几种方法来为 flink 设置 python 环境。结果是一样的。

选项 1(使用 conda)

选项2(使用系统python +诗歌)

  • 在系统级别安装 python 3.8.10
  • pyproject.toml 依赖项:
  • 安装依赖项:

问题

最小的例子:

如果我从文档中运行示例,它就可以工作。一旦我使用 python udf 添加“map”操作,我就会收到 py4j 错误。这就是我得到的:

我试图找出 py4j 本身是否有效。但确实如此。这给我打印了两个数字,所以它似乎工作:

非常感谢任何要检查或测试的提示。