问题标签 [flink-batch]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
309 浏览

apache-flink - 使用 MiniCluster 测试 flink 作业以使用处理时间触发计时器

在使用MiniClusterWithClientResource测试 flink 作业时,有什么方法可以控制触发计时器的处理时间?

我能够使用测试工具在单元测试中测试 KeyedCoProcessFunction 即 processElement()... 触发计时器回调即 onTimer()...的两种方法并控制处理时间,即:

//通过直接提前算子的处理时间触发处理时间计时器 testHarness.setProcessingTime(300000)

因此。我可以在指定的时间触发定时器

但是,我现在需要的是使用 minicluster MiniClusterWithClientResource在端到端的 flink 作业测试中触发计时器

val flinkCluster = new MiniClusterWithClientResource... 并且能够提前处理时间来触发 onTimer 方法

0 投票
1 回答
623 浏览

apache-flink - Apache Flink JobListener 无法正常工作

我在 flink 1.11.1 中写了一个 flink 批处理作业。工作成功完成后,我想做一些类似调用http服务的事情。

我添加了一个简单的作业侦听器来挂钩作业状态。问题是当 kafka sink 操作员抛出错误时,作业侦听器不会触发。我希望当我的工作失败时,它应该触发我的工作监听器并打印失败日志。

我如何确定工作是否成功完成?

任何帮助将不胜感激。

0 投票
1 回答
176 浏览

apache-flink - 如何使用 Apache Flink CEP SQL 从已经匹配的模式中获取事件?

我的要求是基于 2 个事件(EVT_A 和 EVT_B 独立于顺序)生成触发器。这里是期待

我尝试了以下但没有成功。

我也试过“ AFTER MATCH SKIP TO FIRST A ”。但它也有例外。任何建议我如何使用 Flink SQL CEP 或 Flink 中的任何其他方式来实现这一点。

0 投票
1 回答
256 浏览

apache-flink - Apache Flink 中的 Hash Join 和 Sort 合并异常

集群基础设施:

我们有 Flink 独立集群,有 4 个节点,每个节点有 16 个 CPU 核和 32Gb 物理内存,其中 16GB 设置为 Flink 托管内存,其余设置为 UDF 和 Java Heap。因此,每个插槽,我们分配了 1 个核心和 1GB 的内存。

场景描述:

我们正在尝试连接两个数据集 A 和 B,其中数据集 A 是<String, ArrayList>的元组,数据集 B 具有自定义 POJO,并且数据集的连接键都是String

对于这两种数据集的大小都不能保证,在某个时间点 A 可能很大,而在另一个时间点数据集 B 可能更大。此外,一个数据集很可能有多个重复条目列表。

例如: 数据集 A 具有 <String, LocationClass> 大小 = 51 mb
的信息 数据集 B 可能具有大小 = 171 mb
连接键的信息:位置示例、孟买、纽约等。

因此,为了加入这个我们选择了一个 joinHint 策略作为Repartition_Hash_First。此策略有时效果很好,有时会引发以下异常,

所以我们尝试使用 Repartition_Hash_Second 但结果是一样的。

因此,根据我的理解,Flink 在内部为提供即 First 或 Second 的一侧创建了一个哈希表,而另一侧的数据被迭代到哈希表,反之亦然,因为其中一个键有很多无法容纳的数据在创建哈希表时进入 flink 的实际内存,它会引发过多重复键的异常。

因此,在第二步中,我们尝试使用 Repartition_Sort_merge 来实现这一点,但我们得到了以下异常,

如果我们需要将 flink 托管内存增加到 2 GB 甚至更多,谁能建议我?还是我们应该选择一些不同的策略来处理这个问题?

0 投票
1 回答
135 浏览

elasticsearch - 使用 Flink Rich InputFormat 创建 Elasticsearch 的输入格式

我们正在使用 Elasticsearch 6.8.4 和 Flink 1.0.18。

我们在 elasticsearch 中有一个包含 1 个分片和 1 个副本的索引,我想创建自定义输入格式以使用 apache Flink 数据集 API 在 elasticsearch 中读取和写入数据,并具有超过 1 个输入拆分,以实现更好的性能。那么有什么办法可以达到这个要求吗?

注意:每个文档的大小更大(几乎 8mb),由于大小限制,我一次只能读取 10 个文档,并且每个读取请求,我们想要检索 500k 条记录。

根据我的理解,并行度数应该等于数据源的分片/分区数。但是,由于我们只存储少量数据,因此我们将分片的数量保持为 1,并且我们有一个静态数据,它每月会略微增加。

任何帮助或源代码示例将不胜感激。

0 投票
0 回答
131 浏览

apache-flink - 为什么 Apache Flink SQL 验证器为此 CEP SQL 提供 NPE?

这是我的 Flink CEP MATCH_RECOGNIZE sql。

给出错误为

然而,如果我删除 2 个 EXISTS 条件,那么它就可以工作。另外,我尝试为我的数据流创建另一个表并在内部查询中使用它,但仍然出现相同的错误?Flink CEP SQL 不支持我做错了什么或这样的语法?

注意-我使用的是 1.11 版本。我已经在普通的 FLINK SQL 中尝试过 EXISTS 子句(即选择 ....from....where..)并且它有效。但不适用于 Flink CEP SQL。谢谢。

0 投票
0 回答
280 浏览

elasticsearch - Flink 弹性搜索源连接器

我是 Flink 和 Elastic Search 集成的新手。我有一个场景,我必须将历史数据(大约 1TB)从旧的弹性搜索集群(5.6)加载到新的集群(6.8)。我必须在迁移过程中进行一些数据过滤和修改。考虑使用 flink 批处理作业和 flink-es-sink 操作符。

但是由于目前没有可用的 flink-es-source 运算符,因此将数据源到我的 flink 管道中的最佳方法是什么。我有几个选择来做到这一点。

  1. 写一个flatmap函数/处理函数并获取记录
  2. 使用一些开源的 3rd 方库将 flink 连接到 ES。但不想冒险,因为不知道这些程序如何执行

但不确定哪个是最好的方法,因为数据量很大,我可能不得不并行化源操作符。

如果你们中的任何人遇到这种情况,请提出几个选项。提前致谢

0 投票
1 回答
277 浏览

java - 在 apache flink 中使用 DataSet 的 collect() 函数的问题

我正在尝试计算社交媒体下图中的 AdamicAdar 关系指数。我使用 apache flink-gelly lirbrarie 设置了我的边、顶点、数据集和图形。这是我的代码:

这是我得到的错误:

这也是我使用的 edges.csv 文件的一部分:

其中 5 316 表示顶点 5 连接到顶点 216,这定义了一条边。

这是我的 pom.xml 文件 pom.xml

0 投票
0 回答
163 浏览

java - Apache Flink 流限制

从 DataSource 获取后,我需要过滤很多有界数据(例如,由于复杂的过滤逻辑,我无法使用查询立即过滤它)。而且我需要限制管道末端的最大数据量(以实现某种分页)。

Stream.limit()因此,在积累了所需的数据量后,我需要某种 Java来停止从 DataSource 获取行。

在 Apache Flink 中这样做的最佳方法是什么?我在看柜台,但也许有更合适的 API?

0 投票
1 回答
120 浏览

apache-flink - 使用 Java 设置 Flink 分离模式

Flink 集群详细信息,
节点数:4
Flink 版本:1.11
Flink 客户端:RestCluserClient

我们正在使用 PackagedProgram 从流式作业提交 Flink 批处理作业,但我们的要求是一次只执行一个作业,假设我们从源获得 2 个事件,因此理想情况下必须触发 2 个批处理作业(每个事件每个),但只有一个一次。为了实现这一点,我们使用了client.setDetached(false)(在之前的 flink 版本中),但是一旦我们将它迁移到 1.11,setDetached(false) API 就被删除了。

我们知道如何实现这个要求吗?