问题标签 [pyflink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
74 浏览

apache-flink - PyFlink unix 纪元时间戳转换问题

我有带有 unix 纪元时间戳的事件,我正在使用带有 Kinesis 连接器的表作为源表。我需要使用与水印相同的时间戳字段。我如何在python中做到这一点?我正在使用 Flink-1.11 版本,因为那是最新的 AWS 支持。

活动形式: {'event_time': 1633098843692, 'ticker': 'AMZN'}

Python 表:

0 投票
1 回答
133 浏览

apache-flink - 在 Flink 中何时使用临时表或永久表

Flink 新手,我正在构建一个简单的聚合管道,例如每天的销售额。我正在使用表 api。我看到创建表有两种选择:临时表和永久表。对于永久表,我们还需要设置一个目录,例如 HIVE。所以我倾向于使用临时表,这很容易上手。但很好奇彼此的好与坏。

根据文档,当 Flink 作业停止时,临时表不存在。那么如果我们为 bug 修复进行 Flink Job 部署会发生什么。

谢谢!

0 投票
1 回答
83 浏览

apache-flink - FLINK:java.io.IOException:网络缓冲区数量不足

我正在尝试使用 flink 来丰富多个数据流的数据。

这里我在 account_stream 和 status_stream 中有一些数据。我想将该数据添加到来自多个不同来源的所有其他流中。所有的流在它们​​的数据中都有一个共同的字段:“account_id”。

这是我采取的方法。

我正在将必要的数据保存在状态中,并使用 flat_map 函数将其附加到所有流中。最后添加一个 kafka 接收器来发送所有丰富的状态流。

现在,一旦我执行此操作,就会收到此错误:'' java.io.IOException: Insufficient number of network buffers: required 17, but only 8 available. 网络缓冲区的总数当前设置为 2048 个,每个 32768 字节。''

我尝试在 flink 配置文件中将taskmanager.memory.network.fraction 更改为 0.5taskmanager.memory.network.max 更改为 15 gbtaskmanager.memory.process.size 更改为 10 gb 。但它仍然给出了同样的错误。除了保存它以查看更改反映在 flink 作业中之外,我是否还需要做其他事情?还是问题是别的?

还让我知道这种方法是否对任务无效,是否还有其他我应该尝试的方法?

我使用单个 32gb ram、8 核服务器在 python 中使用 pyflink 库运行它,kafka 和 elastic 在同一台服务器上运行。

谢谢你。

0 投票
1 回答
130 浏览

apache-flink - Flink 水印没有在 Python 中推进,卡在 -9223372036854775808

我在几个管道中遇到了这个问题,但一直找不到答案。当使用时间戳分配器为单调或越界时间戳分配水印策略的管道时,时间戳被正确提取并正在推进,但水印停留在 -9223372036854775808。我尝试在 pyflink 库中运行 event_time_timer.py 示例作为健全性检查,但经过检查,process_element 和 on_timer 方法都没有移动 -9223372036854775808 的水印。和 9223372036854775807 分别。

这是流程函数和时间戳分配器的代码:

这是主要功能:

无论我使用哪种水印策略,我的主管道都有同样的问题。水印不应该链接到时间戳并在 ProcessFunction.Context 中看到吗?

0 投票
0 回答
155 浏览

apache-kafka - pyflink on yarn, kafka, NoClassDefFoundError

我在纱线上部署了我的 pyflink 作业,它包括 kafka 消耗。

我的运行命令:

当我提交我的工作时,它得到这个输出错误:</p>

我的环境:jdk1.8.0_211 flink 1.13.2 centos 7.6

我的 jar 类路径:/opt/flink/lib flink-connector-kafka_2.12.1.13.2.jar flink-sql-connector-kafka_2.11.1.13.2.jar

我怀疑它在我的环境中找不到 kafka jar 库,而我使用 --jarfile 和 venv.zip 来声明,如何解决这个问题?非常感谢

0 投票
0 回答
96 浏览

python - 如何将 pyFlink 作业提交到远程 Kubernetes 会话集群?

目前,我有一个正在运行的 Flink Kubernetes 会话集群(Flink 版本 1.13.2),我也可以通过此命令从本地环境port-forward中提交 WordCount jar 示例。./bin/flink run -m localhost:8081 examples/batch/WordCount.jar

但是当我尝试通过命令提交 pyFlink 示例时,./bin/flink run -m localhost:8081 -py examples/python/table/batch/word_count.py作业冻结并且日志显示正在等待结果。

我尝试了很多方法,包括创建 virtualenv、传递 pyClientExecutable 和 pyexec、同步本地和远程 python 版本,但是它们都不起作用。

我错过了什么?如何将 python 示例提交到远程会话集群?

注意:当我在作业管理器 pod 中提交 pyFlink word_count 示例时,它运行没有任何问题。

0 投票
1 回答
68 浏览

python - 在 PyFlink 中使用消息键进行窗口分组

我正在为一个项目使用 PyFlink 1.13,我正在尝试执行以下操作:

  • 从消息包含 UserId 的 Kafka 主题中读取数据
  • 对数据执行超过 2 秒的翻转窗口
  • 用我的 windows 值调用 Python UDF

这是我试图实现的数据流的可视化表示: 在此处输入图像描述

我正在使用 PyFlink 的 Table API,并且我的两个表都是使用 SQL DDL 声明的。

我的查询执行如下所示:

这是我的 Python UDF 函数:

我目前的问题是,由于某种原因,该my_udf函数分别接收每一行,因此在上面的示例中将被调用 4 次而不是 2 次。

我一直在研究 PyFlink 文档,但我无法找到如何实现我想要的。

该信息可能在文档中,但似乎我无法找到/理解它。

任何帮助,将不胜感激。

谢谢 !

0 投票
1 回答
157 浏览

python-3.x - PyFlink 简单管道。在类路径中找不到适合“org.apache.flink.table.factories.TableSourceFactory”的表工厂

我正在尝试使用 PyFlink 读取和写入 Kafka 来运行一个简单的示例。我收到的消息类型如下:

我收到的错误是:

我看过其他 Stackoverflow 问题,他们专注于可能缺少的那种罐子。但是,我已经添加了 Kafka 连接器 Jar,然后是 Json 一个。我不确定我是否缺少更多的罐子,因为我是 Flink 的新手。

如前所述,尽管未来的目标是根据每条消息的 EventTime 分析窗口,但现在我只是尝试将我从源主题收到的完全相同的消息发送到不同的 Kafka 主题. 到目前为止,请找到我根据找到的一些文档构建的代码。我试图删除水印(这是源表和接收表之间的唯一区别),但我仍然得到完全相同的错误:

如果有人可以在这个问题上帮助我,我将不胜感激,这应该与依赖关系有关,但信息不清楚。我试图更改代码,但到目前为止没有任何效果。

就像澄清一样,<userName>并由<IP>我的脚本中的正确值更改。

编辑: 我使用的 PyFlink 版本是apache-flink 1.12.5我用 Pip3 安装的。我之前尝试过使用最新版本的 PyFlink (1.14.0),但我意识到Dataproc当前运行 Flink 1.12.5

0 投票
0 回答
33 浏览

python - 如何在 PyFlink 中导入私有包

我正在尝试在 PyFlink 中导入一个私有包。

我有一个需要调用私有库中的函数的 UDF(不能通过 pip install 获得,因为 repo 是私有的)。

因此我不能使用这样的东西:

如何导入这个私有包以在我的 PyFlink UDF 函数中访问它?

0 投票
1 回答
201 浏览

pyspark - PyFlink 性能与 Scala 相比

PyFlink 性能与 Flink + Scala 相比如何?

大图。目标是使用冷热层构建 Lambda 架构。冷(批处理)层将使用 Apache Spark (PySpark) 实现。但是对于 Hot (Streaming) Tier,有不同的选择:Spark Streaming 或 Flink。

因此 Apache Flink 是纯流而不是 Spark 的微批处理,我倾向于选择 Apache Flink。但我唯一担心的是 PyFlink 的性能。它会比 PySpark 流式传输具有更少的延迟吗?是不是比 Scala 写的 Flink 代码慢?在什么情况下它会变慢?

先感谢您!