问题标签 [flink-batch]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
301 浏览

apache-flink - Flink 从 Hadoop 读取数据并发布到 Kafka

我需要从 HDFS 读取数据并将其发布到 Kafka 主题。因为它们是 DataSet 和 DataStream API 的一部分,是否有可能在一项工作中完成我正在寻找的事情?

0 投票
0 回答
91 浏览

apache-flink - flink作业执行过程中数据接收阶段是否需要很多时间?

我正在使用 VM,我的集群由 3 个任务管理器组成,主服务器也是作业管理器和任务管理器(4 个任务管理器和一个作业管理器),大约 12 GB 内存和 6 个处理器,使用 flink-1.7.2 版本,但在我运行 flink 之后工作

我发现作业仍在 datasink(collect) 上运行

0 投票
0 回答
99 浏览

amazon-web-services - flink readCSV 被“org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:Timeout waiting for connection from pool”抛出

我们正在使用 Flink 1.9.0 Dataset API 从 Amazon S3 Bucket 读取 CSV 文件。大多数时候都面临连接池超时。以下是 Flink 级别的配置

  1. 一次从 s3 读取 19708 个对象,因为我们需要在整个数据集之上应用逻辑。举个例子:假设有 20 个源文件夹,例如(AAA、BBB、CCC)和多个子文件夹(AAA/4May2020/../../1.csv、AAA/4May2020/../../2.csv、 AAA/3May2020/../../1.csv ,AAA/3May2020/../../2.csv ....),为了读取发生,在调用 readCSV 之前,逻辑扫描文件夹并选择仅具有最新日期文件夹的文件夹并将其传递以供阅读。对于读取操作,我们使用并行度为“5”。但是当执行图形成时,所有 20 个源都聚集在一起。

  2. 在 Kube-Aws 上运行,大约 10 个任务管理器托管在“m5.4X 大型机器”下。任务管理器 docker 分配有“8”个内核和“50GB”内存。

以下试图解决这个问题,但到目前为止没有运气。真的需要一些指示和帮助来解决这个问题

  • 启用 Flink 重试机制,将故障转移作为“区域”,有时重试可以通过。但即使重试,它也会间歇性地失败。
  • 根据 AWS 站点重新访问 core-site.xml: fs.s3a.threads.max :3000, fs.s3a.connection.maximum :4500 任何人都可以帮助解决以下问题

  • 无论如何要检查 readCSV 打开的 HTTP 连接是否
    已关闭

  • 任何了解数据集 ReadCSV 如何操作的指针都会有所帮助。
  • 有什么方法可以在读取之前引入等待机制?
  • 任何更好的方法来解决这个问题
0 投票
2 回答
567 浏览

java - 在对象类型与静态类型之间进行序列化时的性能差异

当要序列化时,我们是否需要静态类型/声明变量的数据类型?它在序列化时会提高任何性能吗?

我正在为批处理创建一个 flink 项目。我编写了一个自定义输入阅读器,它将通过 jdbc 从数据库中读取,并以 Hashmap 的形式返回一条记录,其中包含列名和值。我知道 Flink 在每个子任务之间序列化对象。所以,我的问题是,由于我有 hashmap 类型的值,它对序列化性能有什么影响吗?

Flink 默认使用 kyro 序列化器

0 投票
1 回答
241 浏览

apache-flink - 为什么以并行度 = 1 执行 Flink 作业是不好的?

我试图了解在提交 Flink 作业之前需要考虑哪些重要功能。

我的问题是并行数是多少,是否有上限(物理上)?并行性如何影响我的工作绩效?

例如,我有一个 CEP Flink 作业,它从非键控流中检测模式,并行数将始终为 1,除非我使用 KeyBy 运算符对数据流进行分区。

如果我错了,请纠正我:

如果我对数据流进行分区,那么我将拥有等于不同键的数量的并行度。但问题是模式匹配是针对每个键独立完成的,因此我无法定义需要来自具有不同键的 2 个分区的信息的模式。

0 投票
1 回答
379 浏览

java - Flink Java API - Pojo 类型到元组数据类型

我正在 JAVA flink API 上创建一个小实用程序来学习功能。我正在尝试读取 csv 文件并打印它,并且我已经为数据结构开发了一个 POJO 类。当我执行代码时,我看不到正确的值。(整数值被零和字符串的空值替换。如何映射属性的数据类型

我的主要课程:

我的 Pojo 类 (DataModel.class)

当我执行 main 方法时,我看不到正确的值。样本结果

正如我所期望的那样

这里可能缺少什么?

0 投票
2 回答
302 浏览

apache-flink - flink 1.7.2 数据集不支持kafka sink吗?

flink 1.7.2 数据集不支持 kafka sink 吗?

完成批处理操作后,我需要将消息发布到 kafka,这意味着 source 是我的 postgres,sink 是我的 kafka。

是否可以 ?

0 投票
1 回答
619 浏览

java - Apache Flink:为 DataStream API 添加侧输入

在我的Java应用程序中,我有三个 DataStreams。例如,一个流数据是从 Kafka 消费的,另一个流数据是从 Apache Nifi 消费的。对于这两个流的对象类型是不同的。例如,Stream-1 对象类型为 Person,Stream-2 对象类型为 Address。

第三个是广播流(因为这个数据是从 Kafka 消费的)。

现在我想将 Stream-1 和 Stream-2 组合在一个 Job 类中,并希望在任务流程元素中进行拆分。如何实施?

注意: Stream-1 是主流,Stream-2 是侧输入。MainStream 不断从 Kafka 获取数据。对于 Side Input,最初当应用程序启动时,所有表数据都从 DB 加载,然后在表数据更新时(不频繁)读取新数据。

样本结构:

我被称为以下链接。

DataStream API 的 FLIP-17 侧输入

jira/浏览/FLINK-6131

我的用例是:

使用缓慢演变的数据加入流:我们用于丰富的侧输入随着时间的推移而演变(数据从数据库中读取)。这可以通过在处理主输入之前等待一些初始数据可用并在新数据到达时不断地将新数据摄取到内部输入结构中来完成。

0 投票
1 回答
103 浏览

java - 如何向 Apache Flink 表添加新行

是否可以向 flink 表添加新记录/行?例如我有下表配置:

现在让我们假设稍后我有一个包含相同字段的附加数据集或 tuple3;名字,姓氏,年龄。如何将其添加到现有的 flink 订阅者表中?使用动态表或其他注册表的方式(例如:tableEnv.registerTemporaryView("subscribers",subscribers))会解决问题吗?如果不删除它并再次创建它,我无法向该表添加另一条记录,这太昂贵了。

请用java分享答案。

0 投票
1 回答
223 浏览

apache-flink - 在 source 开始时在 flink 中对整个 dataStream 进行分区,并保持分区直到 sink

我正在使用队列(Apache Pulsar)中的跟踪日志。我使用 5 keyedPrcoessFunction 并最终将有效负载下沉到 Postgres Db。我需要为每个 keyedProcessFunction 订购每个 customerId。现在我通过

processFunctionC 非常耗时,在最坏的情况下需要 30 秒才能完成。这会导致背压。我尝试为 processFunctionC 分配更多插槽,但我的吞吐量从未保持不变。它主要保持<每秒4条消息。

每个 processFunction 的当前插槽是

在 Flink UI 中,它显示从 processB 开始的背压,这意味着 C 非常慢。有没有办法在源本身使用应用分区逻辑并将每个任务的相同插槽分配给每个 processFunction。例如:

这将导致仅在少数任务中发生背压,并且不会扭曲由多个 KeyBy 引起的背压。

我能想到的另一种方法是将我的所有 processFunction 和 sink 组合成单个 processFunction 并将所有这些逻辑应用到 sink 本身中。