集群基础设施:
我们有 Flink 独立集群,有 4 个节点,每个节点有 16 个 CPU 核和 32Gb 物理内存,其中 16GB 设置为 Flink 托管内存,其余设置为 UDF 和 Java Heap。因此,每个插槽,我们分配了 1 个核心和 1GB 的内存。
场景描述:
我们正在尝试连接两个数据集 A 和 B,其中数据集 A 是<String, ArrayList>的元组,数据集 B 具有自定义 POJO,并且数据集的连接键都是String。
对于这两种数据集的大小都不能保证,在某个时间点 A 可能很大,而在另一个时间点数据集 B 可能更大。此外,一个数据集很可能有多个重复条目列表。
例如:
数据集 A 具有 <String, LocationClass> 大小 = 51 mb
的信息 数据集 B 可能具有大小 = 171 mb
连接键的信息:位置示例、孟买、纽约等。
因此,为了加入这个我们选择了一个 joinHint 策略作为Repartition_Hash_First。此策略有时效果很好,有时会引发以下异常,
java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident.
Probable cause Too many duplicate keys.
所以我们尝试使用 Repartition_Hash_Second 但结果是一样的。
因此,根据我的理解,Flink 在内部为提供即 First 或 Second 的一侧创建了一个哈希表,而另一侧的数据被迭代到哈希表,反之亦然,因为其中一个键有很多无法容纳的数据在创建哈希表时进入 flink 的实际内存,它会引发过多重复键的异常。
因此,在第二步中,我们尝试使用 Repartition_Sort_merge 来实现这一点,但我们得到了以下异常,
java.lang.Exception:caused an error obtaining the sort input. the record exceeds maximum size of sort buffer.
如果我们需要将 flink 托管内存增加到 2 GB 甚至更多,谁能建议我?还是我们应该选择一些不同的策略来处理这个问题?