问题标签 [flink-batch]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-flink - Apache Flink 1.13 版将表转换为数据集?
我正在将一些为 Flink 1.5 版编写的遗留 Java 代码转换为 Flink 1.13.1 版。具体来说,我正在使用 Table API。我必须从 CSV 文件中读取数据,执行一些基本的 SQL,然后将结果写回文件。
对于 Flink 1.5 版本,我使用以下代码执行上述操作
为了将上述代码转换为 Flink 1.13.1 版本,我编写了以下代码
但是,BatchTableEnvironment
在 Flink 1.13 版本中被标记为“已弃用”。是否有任何替代方法可以转换Table
为Dataset
或直接将 a 写入Table
文件?
scala - 为什么 Flink 在独立集群中运行比在 IDE 中运行慢?
我已经在我的 IDE(Intellij)和独立集群中运行了我的 Flink 程序(在 Scala 中)。在我的程序中,我打印出运行时间。在 IDE 中运行时我得到了 20 秒,在独立集群中运行时我得到了 74 秒。我很困惑为什么在具有 10 个并行度的集群中运行这么多时间。我基本上是在尝试将 Flink 性能与 Spark 进行比较。有人可以帮助我了解它是如何发生的吗?谢谢你。
补充:
我的程序示例可以在这里找到。此特定代码在控制台中打印的时间如下:
- Flink(IDE,Windows):2550ms
- Flink(独立集群,WSL2 Ubuntu):9076ms
我已更改的 Flink 独立集群的配置:
- jobmanager.memory.process.size:2600m
- taskmanager.memory.process.size:5728m
- taskmanager.numberOfTaskSlots:20
- 并行度。默认值:4
- 其余配置使用默认值。
运行 flink jar : flink run --class flinkutils.generated.Test2Agg2Spark ./target/scala-2.12/executorflink_2.12-0.1.jar
apache-flink - 批处理执行模式下的 Apache Flink FileSink:进行中的文件不会转换为完成状态
我们正在尝试做的事情:我们正在评估 Flink 以在模式下使用DataStream APIBATCH
执行批处理。
重现问题的最小应用程序:
Flink 版本:1.12.2 或 1.13.0
预期结果:文件夹中的“最终”文件/Users/user1/output/
。
根据FileSink
文件:
鉴于 Flink 接收器和 UDF 通常不会区分正常作业终止(例如有限输入流)和因故障而终止,因此在作业正常终止时,最后一个正在进行的文件不会转换到“完成”状态.
模式的具体说明BATCH
:
在处理完整个输入之后,提交待处理的文件,即转换到完成状态。
实际结果:
以及以下例外:
我们想知道的是:Flink 是否可以在批处理模式下与FileSink
or结合使用StreamingFileSink
。
提前致谢!
apache-flink - 如何减少 Flink intra-jobs 之间的时间并避免重复任务
我在独立集群中运行了 Flink 有界作业。然后 Flink 将其分解为 3 个工作。一个工作完成后开始下一个工作大约需要 10 秒。如何减少工作之间的时间?当观察任务流程的细节时,我注意到第二个工作完成了第一个工作已经完成的相同任务,加上新的额外任务,等等 3rb 工作。例如,它反复从每个作业中的文件中读取数据,然后将其加入。为什么会这样?我是 Flink 的新用户。AFAIK,我们无法在 Flink 中缓存数据集。真的需要帮助来了解它是如何工作的。谢谢你。
apache-flink - 如何根据事件处理已经可用的状态在 flink 中来自不同的流
我们正在努力根据账户上的活动来推导账户的状态。我们根据帐户上的用户活动计算并保留 expiryOn 日期(表示帐户到期的暂定未来日期)。
我们有一个手动日期更改事件,它给出了一个日期,在该日期基础上,帐户的状态被发出为已过期。
我想知道实现这一目标的最佳方法是什么。所以,我的问题是,由于与计算到期日期相比,未来会发生日期更改事件,广播状态可以解决这个问题吗?如果是,请提出方法。或者,有没有像 Table API 这样的更好的方法来解决这个问题?
java - 将已处理文件从一个文件夹移动到 flink 中的另一个文件夹
我是 flink 的新手,在解决以下用例时面临一些挑战
用例说明:
我将在某个文件夹中每天收到一个带有时间戳的 csv 文件,例如输入。文件格式为 file_name_dd-mm-yy-hh-mm-ss.csv。
现在我的 flink 管道将逐行读取这个 csv 文件,并将其写入我的 Kafka 主题。
读取此文件的数据完成后,需要立即将其移动到另一个文件夹历史文件夹。
为什么我需要这个是因为:假设您的 Ververica 服务器突然或手动停止,并且如果您将所有已处理的文件都放在同一位置,那么在 ververica 重新启动后,flink 将重新读取它之前处理的所有文件。因此,为了防止这种情况,这些文件需要立即将已读取的文件移动到另一个位置。
我用谷歌搜索了很多,但没有找到任何东西,所以你能指导我实现这一目标。
让我知道是否需要其他任何东西。
java - 如何保持 flink 批处理作业在本地连续运行
我正在 Windows 10 机器上通过 flink 批处理机制练习文件读取。
我从 flink 的官网下载了 flink-1.7.2-bin-hadoop24-scala_2.12.tgz 并执行了 start-cluster.bat 。
我通过 Flink 的 UI 上传了 jar 并能够执行该作业,但该作业在几秒钟内完成。
我想保持工作连续运行,以便我可以测试我的用例。
你能指导我实现这一目标的可能方法吗?
apache-flink - 如何长时间保存 flink 日志?
我正在使用 apache flink 1.11.3 来运行我的 java 管道。我注意到失败的工作在几小时后就从历史记录中清除了。我们可以有任何参数来保留失败/完成的任务历史至少 7-10 天吗?
scala - Flink 作业不能在批处理作业中使用保存点
让我以一种通用的方式开始,看看我是否遗漏了一些概念:我有一个流式 flink 作业,我从中创建了一个保存点。这项工作的简化版本如下所示
伪代码:
只要我在没有保存点的情况下运行作业,它就可以正常工作。如果我从保存点开始工作,我会得到一个看起来像这样的异常
如果我设置选项,我可以解决这个问题:
但这最终会导致另一个错误:
当然,我尝试设置配置键taskmanager.memory.managed.consumer-weights
(used DATAPROC:70,PYTHON:30
),但这似乎没有任何效果。
所以我想知道我是否有概念错误并且无法在批处理作业中重用流作业中的保存点,或者我的配置是否存在问题。有什么提示吗?
apache-flink - Flink 批处理作业因文件较大而失败
我正在尝试运行批处理 Apache Beam 作业(通过 TensorFlow Extended - TFX 库)。这是一个批处理作业,它应该只从 S3 读取一些 CSV 文件,将它们转换为 TFRecords 格式(写回 s3)并收集有关数据集的统计信息。管道在非常小的数据集(几 MB)上运行良好,但是当我尝试在更大的数据集(~400 MB)上运行它时,作业 m 似乎卡住了(记录/字节数的指标Flink UI 停止增加),而我在 TaskManager 日志中看到重复错误:
Flink 集群的版本为 1.13.1,并作为原生 Kubernetes 集群部署在 AWS EKS 集群上。
我已将任务管理器和作业管理器的进程内存设置为 26 GB,所以我假设这里没有内存压力。
谢谢,戈尔扬