问题标签 [pyflink]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - 在 Java Flink 应用程序中使用 Python 处理器
我有一个用例,我想用 Java 中的 Flink 实现 AWS Kinesis Data Application。它将通过 Data Streams API 监听多个 Kinesis 流。但是,这些流的分析将在 Python 中完成(因为我们的数据科学家更喜欢 Python)。
从这个答案,似乎支持从 Java 调用 Python UDF。但是,我希望能够将传入流转换为表格,通过
...然后有一个 Python 处理器被调用来处理该流。
我真的有3个问题:
- 这是受支持的用例吗?
- 如果是这样,是否有描述如何执行此操作的文档?
- 如果是这样,这是否会给应用程序增加大量开销?
python - Pyflink 表 API 流组窗口
我正在尝试在PyFlink
. 但是我A group window expects a time attribute for grouping in a stream environment.
尝试它时出错。我在窗口定义和选择中都有一个时间属性。
我究竟做错了什么?
python - PyFlink UDAF InternalRow 与 Row
我正在尝试通过自定义 UDAF 调用外部函数PyFlink
。我使用的函数要求数据在字典对象中。我尝试使用row(t.rowtime, t.b, t.c).cast(schema)
来达到这样的效果。
在 UDAF 之外,此表达式运行良好。在 UDAF 内部,此表达式被翻译成InternalRow
无法转换为字典对象的内容。
有没有办法强制 UDAF 使用Row
而不是InternalRow
?
输出:
python - 关于 PyFlink 使用 execute_sql 选择表并使用 table.print() 的问题
当我运行代码时,我在使用表格的打印功能时遇到了问题
我的csv文件内容是:</p>
源表:</p>
选择:</p>
错误:</p>
apache-flink - pyflink TypeError:需要一个整数
我们正在测试将 flink 用于 IOT 数据处理系统,当我尝试将简单函数映射到流上然后将结果写回 Kafka 时遇到错误。不幸的是,它不提供行号
当我查找此错误时,我没有找到任何有关 pyflink 的信息,尽管从其他项目中弹出的此错误来看,它们似乎指向 python 版本不匹配。在这种情况下,我可以看到 Flink 在我当前的虚拟环境中正确使用了 python,所以我不确定还要检查什么。我的代码如下。我能够运行一个从 Kafka 读取并打印的简单打印 flink 作业,并且最初插入 Kafka 的数据也是从 flink 作业产生的,所以我知道我至少可以在 Kafka 和 Flink 之间进行通信,进行序列化并正确反序列化。任何有关下一步调查内容的指示都会非常有帮助。
apache-flink - pyflink winerror 2 get_gateway()
按照链接描述的stackOverflow链接上的脚本,从某种意义上说,数据的唯一变化我想评估pyflink的简单用法,以分析它的安装是否有效。您可以在系统下方和为了安装它,我使用 pip -m install apache-flink 进入带有 python3.8.4 的虚拟环境。为了安装 flink 本身,我使用了 docker 并导入了 pyflink/playgrounds:1.10.0 的图像。下面是我修改的链接中唯一的部分
错误如下:
apache-flink - 如何在 PyFlink 中实现动态规则功能?
我的目标是实现基于动态规则的流数据集验证。我的项目正在使用 Pyflink。我知道 Flink 中有一个广播模式,但在 Python 中没有找到任何可靠的信息。Pyflink 中是否提供此功能?如果没有,是否有任何解决方法可以在 Pyflink 中实现动态规则
apache-flink - Flink Source kafka Join CDC source to kafka sink
我们正在尝试从 DB-cdc 连接器(upsert 行为)表加入。使用“kafka”事件源,通过现有的 cdc 数据键丰富这些事件。kafka-source (id, B, C) + cdc (id, D, E, F) = result(id, B, C, D, E, F) 放入kafka sink(追加)
问题是,这仅在我们的 kafka sink 是“upsert-kafka”时才有效。但这在数据库中删除时创建了墓碑。我们只需要表现得像普通事件,而不是变更日志。但我们不能只使用“kafka”接收器,因为 db 连接器是 upsert 所以不兼容......
这样做的方法是什么?将 upsert 转换为仅附加事件?
不要介意 Mongo-cdc 连接器,它是新的,但可以用作 mysql-cdc 或 postgre-cdc。
谢谢你的帮助!
apache-kafka - pyflink debezium kafka 抛出 NullPointerException
我正在尝试通过 python 连接到 Flink。数据源是postgresql的CDC(Change-Data-Capture)。
我正在使用的是:
- postgresql==10.1
- wal2json
- 卡夫卡==2.12-2.1.0
- flink==1.13.0
- debezium==1.13.1.Final
卡夫卡运行良好。消费者可以获得CDC消息。但是 Flink 抛出了这个错误。这个恼人的问题困扰了我一个星期。
我的代码:
错误报告如下:
Py4JJavaError: 调用 z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame 时出错。:java.lang.RuntimeException:无法在 org.apache.flink.streaming.api.operators 的 org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) 处获取下一个结果。 collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) 在 org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) 在 org.apache.flink.table.runtime.arrow。 ArrowUtils.filterOutRetractRows(ArrowUtils.java:735) at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:673) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect。 NativeMethodAccessorImpl。在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next( CollectResultFetcher.java:120) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 15 更多原因:java.util.concurrent.ExecutionException: org.apache .flink.runtime.client.JobExecutionException:作业执行失败。在 java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 在 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher .getAccumulatorResults(CollectResultFetcher.java:175) ... 17 更多原因:org.apache。flink.runtime.client.JobExecutionException:作业执行失败。在 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 在 org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) 在 java.util。 concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache .flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174) ... 17 更多原因: org.apache.flink.runtime.JobException:46274339048152,"xmin":null},"op":"c","ts_ms":1624873855079,"transaction":null}}'。在 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:173) 在 org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) 在 org .apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org .apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink .
我认为关键原因是:
引起:java.lang.NullPointerException
但是谷歌没有一个经典的答案。大多数答案对我来说都没用。
感谢任何建议。
apache-flink - pyflink 配置错误 SQL 解析失败
我想确保 flink 在设置方面有效,然后尝试使使用复杂化。在最简单的例子中,我尝试做这些事情。输入包含
column_a,column_b 1,2
输出存在。为了使用 docker 为我的应用程序下载 1.10 的 pyflink 版本,我使用以下代码片段:
产生的错误如下