1

集群基础设施:

我们有 Flink 独立集群,有 4 个节点,每个节点有 16 个 CPU 核和 32Gb 物理内存,其中 16GB 设置为 Flink 托管内存,其余设置为 UDF 和 Java Heap。因此,每个插槽,我们分配了 1 个核心和 1GB 的内存。

场景描述:

我们正在尝试连接两个数据集 A 和 B,其中数据集 A 是<String, ArrayList>的元组,数据集 B 具有自定义 POJO,并且数据集的连接键都是String

对于这两种数据集的大小都不能保证,在某个时间点 A 可能很大,而在另一个时间点数据集 B 可能更大。此外,一个数据集很可能有多个重复条目列表。

例如: 数据集 A 具有 <String, LocationClass> 大小 = 51 mb
的信息 数据集 B 可能具有大小 = 171 mb
连接键的信息:位置示例、孟买、纽约等。

因此,为了加入这个我们选择了一个 joinHint 策略作为Repartition_Hash_First。此策略有时效果很好,有时会引发以下异常,

java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident.
Probable cause Too many duplicate keys.

所以我们尝试使用 Repartition_Hash_Second 但结果是一样的。

因此,根据我的理解,Flink 在内部为提供即 First 或 Second 的一侧创建了一个哈希表,而另一侧的数据被迭代到哈希表,反之亦然,因为其中一个键有很多无法容纳的数据在创建哈希表时进入 flink 的实际内存,它会引发过多重复键的异常。

因此,在第二步中,我们尝试使用 Repartition_Sort_merge 来实现这一点,但我们得到了以下异常,

java.lang.Exception:caused an error obtaining the sort input. the record exceeds maximum size of sort buffer.

如果我们需要将 flink 托管内存增加到 2 GB 甚至更多,谁能建议我?还是我们应该选择一些不同的策略来处理这个问题?

4

1 回答 1

1

我似乎很清楚你的问题是有一个太大的重复组。

此外,重复组可能在两侧,为该组产生 O(n^2) 大小,n 是最大重复组大小。

如果可能的话,我建议您事先对双方进行重复数据删除,例如使用DeduplicateKeepLastRowFunction之类的东西。或者使用行中的其他数据构建更精细的键。

于 2020-09-04T13:58:42.533 回答