我将介绍一些关于“缩减问题”的背景知识以及如何缓解它。请注意,此信息适用于手动缩减以及抢占式 VM 被抢占。
背景
自动缩放会根据集群中“可用”YARN 内存的数量移除节点。它不考虑集群上的 shuffle 数据。这是我们最近的演示文稿中的插图。
在 MapReduce 风格的作业中(Spark 作业是一组 MapReduce 风格的阶段之间的洗牌),来自所有映射器的数据必须到达所有减速器。Mapper 将他们的 shuffle 数据写入本地磁盘,然后 reducer 从每个 mapper 中获取数据。每个节点上都有一个服务器专门用于提供 shuffle 数据,它在 YARN 之外运行。因此,一个节点在 YARN 中可能会显得空闲,即使它需要留下来提供其 shuffle 数据。
当单个节点被移除时,几乎所有的 reducer 都会失败,因为它们都需要从每个节点获取数据。Reducers 将特别失败FetchFailedException
(如您所见),表明它们无法从特定节点获取 shuffle 数据。驱动程序最终会重新运行必要的映射器,然后重新运行 reduce 阶段。Spark 的效率有点低(https://issues.apache.org/jira/browse/SPARK-20178),但它可以工作。
请注意,您可能会在以下三种情况之一中丢失节点:
- 有意删除节点(自动缩放或手动缩减)
- 抢占式虚拟机
被抢占。抢占式虚拟机至少每 24 小时被抢占一次。
- (相对罕见)标准 GCE 虚拟机被 GCE 非正常终止,并重新启动。通常,标准 VM 是透明地实时迁移的。
当您创建自动扩缩集群时,Dataproc会添加几个属性来提高作业在节点丢失时的弹性:
yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10
spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h
请注意,如果您在现有集群上启用自动缩放,则不会设置这些属性。(但您可以在创建集群时手动设置它们)。
缓解措施
1)使用优雅的退役
Dataproc 与YARN 的 Graceful Decommissioning集成,可以在 Autoscaling Policies 或手动缩减操作上进行设置。
当优雅地停用节点时,YARN 会保留它,直到在节点上运行容器的应用程序完成,但不会让它运行新容器。这使节点有机会在被删除之前提供其洗牌数据。
您将需要确保您的正常停用超时时间足够长,以涵盖您最长的作业。自动缩放文档建议1h
作为起点。
请注意,只有处理大量短作业的长时间运行的集群才真正有意义。
在临时集群上,最好从一开始就“调整规模”集群,或者禁用缩减规模,除非集群完全空闲(设置scaleDownMinWorkerFraction=1.0
)。
2) 避免抢占式虚拟机
即使使用优雅退役,抢占式 VM 也会通过“抢占”定期终止。GCE 保证抢占式虚拟机会在 24 小时内被抢占,大型集群上的抢占非常分散。
如果您正在使用正常停用,并且FetchFailedException
错误消息包括-sw-
,您可能会看到由于节点被抢占而导致的获取失败。
您有两个选项可以避免使用抢占式虚拟机: 1. 在您的自动扩展策略中,您可以设置secondaryWorkerConfig
拥有 0 分钟和最大实例,而是将所有工作人员放在主要组中。2. 或者,您可以继续使用“次要”工作人员,但设置--properties dataproc:secondary-workers.is-preemptible.override=false
. 这将使您的辅助工作者成为标准虚拟机。
3) 长期:增强的灵活性模式
Dataproc 的增强灵活性模式是对随机播放问题的长期解决方案。
缩小问题是由存储在本地磁盘上的随机数据引起的。EFM 将包括新的shuffle 实现,允许将 shuffle 数据放置在一组固定的节点(例如,仅主要工作人员)上,或放在集群外部的存储上。
这将使二级工人无国籍,这意味着他们可以随时被移除。这使得自动缩放更加引人注目。
目前,EFM 仍处于 Alpha 阶段,无法扩展到现实世界的工作负载,但请注意在夏季之前推出可用于生产的 Beta。