问题标签 [pyflink]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - 如何在 Flink 中使用 table API 消费两个源流
我有一个使用 python 表 api 的 Flink 作业。现在我的应用程序将消耗一个额外的源流。我很好奇使用表 API 使用多个源流的推荐方法是什么。
附加信息:
两个输入流只是两个事件源。我想将它们与窗口操作聚合在一起。就像DataStream 中的联合操作
谢谢!
python - 在 pyflink 中找不到用户定义的指标
pyflink 的用户定义指标定义如下。
提交到 flink 集群后,我调用了 metrics api,但找不到 'test_group' 或 'current_offset' 指标。我在仪表板上也找不到。http://localhost:8081/jobs/a680d06c3957d484700878c47fe5d5bd/metrics http://localhost:8081/jobs/a680d06c3957d484700878c47fe5d5bd/vertices/e3dfc0d7e9ecd8a43f85f0b68ebf3b80/metrics
flink 版本为 1.13.1 ,python 版本为 3.7 。
我应该怎么办?
python - 运行 Flink 的 python 示例
我正在尝试运行 Apache Flink 1.14.0 附带的 python 示例,但不断出现错误。我按照说明安装,.jar
示例工作正常,所以我不确定 python 有什么问题。
例如,该命令./bin/flink run --python /Users/[...]/flink-1.14.0/examples/python/datastream/word_count.py
会生成以下异常:
2021-11-05 23:04:11 org.apache.flink.runtime.JobException:恢复被 org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) 的 NoRestartBackoffTimeStrategy 抑制在 org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) 在 org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) 在 org.apache .flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) 在 org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) 在 org.apache.flink.runtime.scheduler.SchedulerBase .updateTaskExecutionState(SchedulerBase.java:679) 在 org.apache.flink.runtime.scheduler.SchedulerNG。updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java。 base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.base/java.lang.reflect。 Method.invoke(Method.java:564) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils .runWithContextClassLoader(ClassLoadingUtils.java:83) 在 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) 在 org.apache.flink.runtime.rpc.akka.AkkaRpcActor。handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor. java:163) 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 在 scala.PartialFunction.applyOrElse(PartialFunction.scala:123 ) 在 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 在 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 在 akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 在 scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor。aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell. scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) 在 java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016) 在 java.base/java。 util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665) 在 java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598) 在 java.base/java.util.concurrent.ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:183) 原因:org.apache.flink 中的 java.lang.ExceptionInInitializerError。streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:401) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:243)在 org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:126) 在 org.apache.org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)。 flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72) 在 org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:555) 在 org.apache.flink.streaming.api.runners。 python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:246) 在 org.apache.flink.streaming.api.operators.python。AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:131) at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116) at org.apache.flink.streaming.api.operators.python。 PythonProcessOperator.open(PythonProcessOperator.java:59) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( StreamTask.java:711) 在 org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask. java:687) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache .flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run (Thread.java:832) 原因:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 的 org.apache 的 14 .beam.runners.core.construction.Environments.getJavaVersion(Environments.java:355) 在 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 在 org.apache.beam.runners .core.construction.Environments.(Environments.java:119) ... 20 更多654) 在 org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 在 org.apache.flink .runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread .java:832) 原因:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 的 org.apache.beam 的 14 .runners.core.construction.Environments.getJavaVersion(Environments.java:355) 在 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 在 org.apache.beam.runners.core .construction.Environments.(Environments.java:119) ... 20 更多654) 在 org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 在 org.apache.flink .runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread .java:832) 原因:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 的 org.apache.beam 的 14 .runners.core.construction.Environments.getJavaVersion(Environments.java:355) 在 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 在 org.apache.beam.runners.core .construction.Environments.(Environments.java:119) ... 20 更多org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 的 org.apache.flink.runtime.taskmanager 的 apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)。 Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:832)原因:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 的 org.apache.beam.runners.core 的 14。 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 中的 construction.Environments.getJavaVersion(Environments.java:355)。 (Environments.java:119) ... 20 更多org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 的 org.apache.flink.runtime.taskmanager 的 apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)。 Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:832)原因:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 的 org.apache.beam.runners.core 的 14。 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 中的 construction.Environments.getJavaVersion(Environments.java:355)。 (Environments.java:119) ... 20 更多taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task. java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:832) 原因:java.lang .UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 的 14 处 org.apache.beam.runners.core.construction.Environments.getJavaVersion( Environments.java:355) 在 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 在 org.apache.beam.runners.core.construction.Environments.(Environments.java:119 ) ... 20 更多taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task. java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:832) 原因:java.lang .UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 的 14 处 org.apache.beam.runners.core.construction.Environments.getJavaVersion( Environments.java:355) 在 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 在 org.apache.beam.runners.core.construction.Environments.(Environments.java:119 ) ... 20 更多runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 在 java.base/java.lang.Thread.run(Thread.java:832) 引起:java.lang.UnsupportedOperationException:不支持Java 版本: org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 中的 14,在 org.apache.beam.runners.core.construction.Environments.getJavaVersion(Environments.java: 355) 在 org.apache.beam.runners.core.construction.Environments.(Environments.java:119) 的 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) ...还有 20 个runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 在 java.base/java.lang.Thread.run(Thread.java:832) 引起:java.lang.UnsupportedOperationException:不支持Java 版本: org.apache.beam.runners.core.construction.Environments$JavaVersion.forSpecification(Environments.java:106) 中的 14,在 org.apache.beam.runners.core.construction.Environments.getJavaVersion(Environments.java: 355) 在 org.apache.beam.runners.core.construction.Environments.(Environments.java:119) 的 org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) ...还有 20 个restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)在 java.base/java.lang.Thread.run(Thread.java:832) 引起:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion 的 14。 forSpecification(Environments.java:106) at org.apache.beam.runners.core.construction.Environments.getJavaVersion(Environments.java:355) at org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments. java:402) 在 org.apache.beam.runners.core.construction.Environments.(Environments.java:119) ... 20 更多restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)在 java.base/java.lang.Thread.run(Thread.java:832) 引起:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction.Environments$JavaVersion 的 14。 forSpecification(Environments.java:106) at org.apache.beam.runners.core.construction.Environments.getJavaVersion(Environments.java:355) at org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments. java:402) 在 org.apache.beam.runners.core.construction.Environments.(Environments.java:119) ... 20 更多java:575) 在 java.base/java.lang.Thread.run(Thread.java:832) 原因:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction 的 14。 Environments$JavaVersion.forSpecification(Environments.java:106) 在 org.apache.beam.runners.core.construction.Environments.getJavaVersion(Environments.java:355) 在 org.apache.beam.runners.core.construction.Environments。 getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 在 org.apache.beam.runners.core.construction.Environments.(Environments.java:119) ... 20 更多java:575) 在 java.base/java.lang.Thread.run(Thread.java:832) 原因:java.lang.UnsupportedOperationException:不支持的 Java 版本:org.apache.beam.runners.core.construction 的 14。 Environments$JavaVersion.forSpecification(Environments.java:106) 在 org.apache.beam.runners.core.construction.Environments.getJavaVersion(Environments.java:355) 在 org.apache.beam.runners.core.construction.Environments。 getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) 在 org.apache.beam.runners.core.construction.Environments.(Environments.java:119) ... 20 更多Environments.getJavaVersion(Environments.java:355) at org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) at org.apache.beam.runners.core.construction.Environments.(Environments .java:119) ... 20 多个Environments.getJavaVersion(Environments.java:355) at org.apache.beam.runners.core.construction.Environments.getDefaultJavaSdkHarnessContainerUrl(Environments.java:402) at org.apache.beam.runners.core.construction.Environments.(Environments .java:119) ... 20 多个
python - pyflink 有 max 函数还是 max_by 函数?
- 我找不到对应的函数 max 或 max_ by
** 这是我的消息 **
Java 结果
我想在 Python 中做到这一点
Python 这是一条错误消息
* Traceback(最近一次调用最后):文件“/Users/yinbaidong/PycharmProjects/pythonProject2/Test/Rolling.py”,第 15 行,在 f_map() 文件“/Users/yinbaidong/PycharmProjects/pythonProject2/Test/Rolling.py ",第 10 行,在 f_map lambda i: i.get("id")).max("temperature") AttributeError: 'KeyedStream' 对象没有属性 'max'
这是我的测试数据 wudu.txt
帮我
apache-flink - Pyflink 1.14 数据流源 -> 熊猫处理 -> 数据流接收器
我最近在 Pyflink 中看到了可以通过 Table API 在 flink 中使用 pandas 数据帧的文档。因此,我的目标是:
- 从 Kafka 源接收数据流
- 转换为表 API 实例 -> 然后可以转换为 Pandas
- --- Pandas 处理逻辑
- 将 pandas 数据帧转换回 Table 实例
- 然后将其转换回数据流并沉入kafka
根据 flink 文档,我引用了转换Datastream <-> Table Instance和Table <-> pandas之间的代码。
将其提交给 flink 时,将创建作业而不会出现任何错误或异常:
$ /opt/flink/bin/flink 运行 --python script.py
但是在flink UI上可以看到job name没有分别注册
以及未反映在输出主题上的熊猫逻辑。(1)作为源数据包接收的 json 数据包,(2) pandas 本质上为数据包添加了一个新值,(3)然后应该将此数据包接收回输出主题
收到的源主题:
{“气缸”:8.0,“排量”:360.0,“马力”:215.0,“重量”:4615.0,“加速度”:14.0,“车型年”:70.0,“美国”:1.0,“欧洲”:0.0, “日本”:0.0}
目标主题的输出('testing': 'New vals' not added):
{“气缸”:8.0,“排量”:360.0,“马力”:215.0,“重量”:4615.0,“加速度”:14.0,“车型年”:70.0,“美国”:1.0,“欧洲”:0.0, “日本”:0.0}
如果我的方法不正确,有人可以告诉我正确的实施方式吗?这应该作为无界流操作工作(不是作为批处理操作,如果我的术语在这里正确......)
apache-flink - PyFlink SQL 本地测试
所以我有一个用 PyFlink SQL API 编写的简单聚合作业。该作业从 AWS kinesis 读取数据并将结果输出到 Kinesis。
我很好奇我是否可以用 pytest 对我的管道进行单元测试?我猜我需要用文件系统连接器模拟源和接收器?但是如何创建本地 Flink 会话来在 pytest 中运行作业?我们在这里有最佳实践建议吗?
谢谢!
apache-kafka - Pyflink 1.14 表连接器 - Kafka 身份验证
我只看到了 kafka 连接的 Pyflink 表 API 示例,它在连接建立(doc ref )中不包含身份验证详细信息,即源表连接:
但是,我需要连接到启用身份验证的 kafka 源。通过“解释”所有property.XXX专用于 kafka 配置,我将示例更改如下并进行了测试:
通过从 cli 添加此作业,没有响应或指示使用相应的作业 ID 实例化作业:
查看 flink UI 时分别没有创建作业
如果我错误地配置了连接,有人可以纠正我,或者指出我的相关文档来源(我已经用谷歌搜索了很多......)
python - Pyflink:组窗口需要一个时间属性来在流环境中进行分组
我使用 pyflink 1.11 有以下代码:
但我不断收到此错误:
有人可以帮我理解我做错了什么吗?不确定它是否重要,但它event_timestamp
是"2021-11-03 20:24:46.095000"
格式。
先感谢您!
apache-flink - Flink SQL Tumble 聚合结果未写入本地文件系统
语境
我有一个由 python SQL api 编码的 Flink 作业。它使用来自 Kinesis 的源数据并将结果生成给 Kinesis。我想进行本地测试以确保 Flink 应用程序代码正确。所以我用文件系统连接器模拟了源 Kinesis 和接收器 Kinesis。然后在本地运行测试管道。虽然本地的 flink 作业总是运行成功。但是当我查看接收器文件时。接收器文件始终为空。当我在“Flink SQL Client”中运行代码时也是如此。
这是我的代码:
好奇我在这里做错了什么。
笔记:
- 我正在使用 Flink 1.11.0
- 如果我直接将数据从源转储到接收器而不进行窗口和分组,它工作正常。这意味着源和汇表设置正确。所以似乎问题在于本地文件系统的翻滚和分组。
- 此代码适用于 Kinesis 源和接收器。
apache-flink - 如何在 PyFlink 中使用 Table API 读取多个目录?
我想在 PyFlink 中使用 Table API 读取多个目录,
但失败并出现以下错误:
我确定这三个目录存在并且我有权访问它:
/opt/data/day=2021-11-14、
/opt/data/day=2021-11-15、
/opt/data/day=2021 -11-16
如果不能读取多个目录,我必须创建三个表并将它们合并,这更加冗长。
任何建议都是值得赞赏的。谢谢