问题标签 [flink-batch]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
134 浏览

scala - 如何更改 Flink DataSet 的默认分区数?

这里有个需求:数据集太大,我们需要对数据进行分区,在每个分区计算一个本地结果,然后合并。例如,如果有 100 万条数据,分成 100 个分区,那么每个副本将只有大约 10000 条数据。由于需要使用分区数进行调优,因​​此要求分区数是可变的。另外,一个分区的所有数据都必须批量计算,不能一一计算。

实现如下:在分区阶段之后,每条数据都会有一个key来表示它所属的分区。现在,数据应该是这样的:afterPartitionedData=[(0,data1),(0,data2)…(1,data3),(1,data4),…,(99,datan)]。 接下来,使用 Flink 的partitionCustommapPartition运算符。

但是,却报错:

这似乎是因为 Flink 默认的分区DataSet数是 CPU 的个数,在我的电脑上是 6 个,所以会报错 java.lang.ArrayIndexOutOfBoundsException : 6

所以我的问题是:有没有办法随意改变分区的数量?Partition (key: int, numpartitions: int)我在 API Partitioner的方法中找到了这个参数,但不知道如何更改它。

有没有办法改变DataSet分区的数量?

Flink 版本为 1.6,测试代码为:

谢谢!

0 投票
0 回答
87 浏览

java - 如何在通过 flink cli 运行的 flink 应用程序 jar 中指定外部属性文件

我正在运行一个 flink 应用程序,并希望我的 flink 应用程序引用在 flink 应用程序启动期间给出的外部属性文件。

我尝试了以下方法:-

0 投票
1 回答
46 浏览

apache-flink - 集群在 statefun 上运行时是否可以添加新的嵌入式工作人员?

这是交易;

我正在处理在运行集群(flink statefun 2.2.1)时添加新的工作人员(嵌入式)。

如您所见,新的任务管理器可以注册到集群;

新部署的任务管理器的屏幕截图

但它没有初始化(它没有部署源);

我在这里想念什么?(master和workers也必须相同的jar文件?或者使用jar文件部署taskmanager就足够了)

任何帮助将不胜感激,谢谢。

0 投票
1 回答
62 浏览

java - Apache Flink 和将工作分配给任务

我有一个与 apache flink 相关的问题。目前我正在研究 apache flink 作为我们的工作框架。

本质上,我们有一个内容摄取工作,我们从提要中摄取一些内容,这些内容本质上是分页的。提要的内容可以是 xml 或 json 格式,每页可以有 1000 条记录。

我需要的是把这 1000 条记录分成任务并并行处理。

我知道 flink 有一些方法可以并行化它的工作,但分成任务槽。我想知道我们如何做到这一点,否则任何在线资源将不胜感激。

我看到我们可以使用 setParallelism() 进行设置,但我很难找到一种方法来设置处理。

所以只是给你一个想法..这是一个示例提要

现在在上面的 xml 示例中,我想划分并并行处理它。

这类似于我们如何在 java 中使用 ExecutorService 进行多线程处理。

在普通的 java 中,我会执行 executorService.submit(subtagTask) 来处理 subTag。

我想知道在 Flink 中是否有办法做到这一点。这将帮助我避免处理线程的头痛和随之而来的头痛。

任何帮助表示赞赏。

问候。

0 投票
0 回答
125 浏览

mongodb - 用于批量 mongodb 更新的 java Apache Flink 批处理性能

专家们,

我正在尝试对 larget 数据集执行一些 ETL 操作作为批处理。我的要求是提取数据,对其进行转换,然后保存到 mongoDB。我正在使用 Apache FLINK,但性能非常慢,因为我正在对每一行进行 mongoDB 更新。

有什么方法可以让我们作为批量记录下沉,以便提高性能。就像在所有转换之后我们在 mongoDB 上进行批量更新一样。我们可以将它们全部聚合起来,最后像流 [.aggregate() .sink({bulk update})] 一样将其下沉到数据库中

我们可以在转换后收集整个集合,然后进行批量 mongoDB 更新吗?目前在地图功能上我正在做更新操作。我已将并行度设置为 8 { setParallelism(8);}

0 投票
1 回答
937 浏览

apache-kafka - AppendStreamTableSink 不支持消费由节点 Join(joinType=[InnerJoin]

我使用Flink SQL执行如下语句时,报错如下:</p>

要求

根据字段对user_behavior_kafka_table中的数据进行分组user_id,然后取出ts每组中字段值最大的那条数据

执行sql

Flink 版本

1.11.2

错误信息

作业部署

纱线上

表消息

  • 来自消费者 kafka 主题的user_behavior_kafka_table 数据

{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目在","ts":100}

{"user_id":"ccc","item_id":"11-222-334","comment":"ccc 访问项目在","ts":200}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":300}

{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项目在","ts":200}

{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目在","ts":200}

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":400}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":400}

{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目在","ts":200}

{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项目在","ts":300}

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":300}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":100}

{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项目在","ts":100}

  • user_behavior_hive_table 预期结果

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":400}

{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项目在","ts":300}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":400}

{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目在","ts":200}

0 投票
1 回答
269 浏览

java - Apache Flink - 将卷挂载到 Job Pod

我正在使用https://www.tutorialspoint.com/apache_flink/apache_flink_creating_application.htm上的教程中的 WordCountProg 。代码如下:

WordCountProg.java

此示例将文本文件作为输入,提供一个单词在文档中出现的次数的计数,并将结果写入输出文件。

我正在使用以下 Dockerfile 创建我的作业映像:

Dockerfile

然后我的工作的 yaml 如下所示:

目标是将 /Users/my-user/Documents/semafor/apache_flink/PV 挂载到作为作业输入的 Pod 中,其中有一个 READ.txt 文件。但是当作业尝试执行时,我收到以下错误:

我试图运行:

还运行了 chmod 777 ...但我得到了同样的错误。

我还尝试将 jar 复制到 READ.txt 文件所在的位置:/Users/my-user/Documents/semafor/apache_flink/PV在我的本地目录上并将其挂载到 /opt/flink/usrlib ,但后来我得到了:

我在 Kubernetes 或 Flink 方面没有那么丰富的经验,所以我不确定我是否安装不正确或者我做错了什么。如果您有任何建议,请lmk。提前致谢。

0 投票
1 回答
252 浏览

apache-flink - Flink Kafka 自定义消费者用于速率限制

我正在尝试扩展 FlinkKafkaConsumer 以使用 flink 版本 1.12 在我的 Flink 作业中限制 Kafka 消费者。作为其中的一部分,我尝试遵循以下线程来实现相同的目的。但是,我在创建AbstractFetcher时在createFetcher方法中遇到了编译问题。

如何在 flink 上使用 Ratelimiter?

他们是在emitRecord中命名的方法吗,因为我找不到KafkaFetcherAbstractFetcher类?

下面是代码片段

解决此问题的任何建议是获取 Flink Kafka Consumer 的自定义速率限制

0 投票
1 回答
83 浏览

apache-flink - ExecutionEnvironment 和 StreamExecutionEnvironment 有什么区别

我是flink的新手。我注意到在使用 java 的 flink 编程过程中,有时它被声明为 StreamExecutionEnvironment,有时被声明为 ExecutionEnvironment。这两个类的主要区别是什么。什么时候用什么。

0 投票
0 回答
126 浏览

apache-flink - Flink 对 hadoop 3.X 的支持

我正在使用 Flink 1.13.1,我们计划注入基于 ftp 的源。由于 Flink 依赖 Hadoop 依赖来支持此类操作,所以我需要了解 Flink 是否支持 Hadoop 3.X。该文档没有提到任何关于此的内容。