3

我的技术任务是将数据从 GCS(谷歌云存储)同步到我们的 Elasticsearch 集群。

我们在 Google Dataproc 集群上使用 Apache Spark 2.4 和 Elastic Hadoop 连接器(启用自动缩放)。

在执行过程中,如果 Dataproc 集群规模缩小,则停用节点上的所有任务都会丢失,并且该节点上处理过的数据永远不会推送到弹性。

例如,当我保存到 GCS 或 HDFS 时,不存在此问题。

即使节点退役,如何使这项任务具有弹性?

堆栈跟踪的摘录:

第 2.3 阶段丢失任务 50.0(TID 427,xxxxxxx-sw-vrb7.c.xxxxxxx,执行程序 43):FetchFailed(BlockManagerId(30,xxxxxxx-w-23.c.xxxxxxx,7337,无),shuffleId=0,mapId =26, reduceId=170, message=org.apache.spark.shuffle.FetchFailedException: 无法连接到 xxxxxxx-w-23.c.xxxxxxx:7337

引起:java.net.UnknownHostException: xxxxxxx-w-23.c.xxxxxxx

阶段 2.3 (TID 427) 中的任务 50.0 失败,但不会重新执行该任务(可能是因为任务因 shuffle 数据获取失败而失败,因此需要重新运行上一个阶段,或者因为不同的副本任务已经成功)。

谢谢。弗雷德

4

1 回答 1

4

我将介绍一些关于“缩减问题”的背景知识以及如何缓解它。请注意,此信息适用于手动缩减以及抢占式 VM 被抢占。

背景

自动缩放会根据集群中“可用”YARN 内存的数量移除节点。它不考虑集群上的 shuffle 数据。这是我们最近的演示文稿中的插图。

在 MapReduce 风格的作业中(Spark 作业是一组 MapReduce 风格的阶段之间的洗牌),来自所有映射器的数据必须到达所有减速器。Mapper 将他们的 shuffle 数据写入本地磁盘,然后 reducer 从每个 mapper 中获取数据。每个节点上都有一个服务器专门用于提供 shuffle 数据,它在 YARN 之外运行。因此,一个节点在 YARN 中可能会显得空闲,即使它需要留下来提供其 shuffle 数据。

正常洗牌

当单个节点被移除时,几乎所有的 reducer 都会失败,因为它们都需要从每个节点获取数据。Reducers 将特别失败FetchFailedException(如您所见),表明它们无法从特定节点获取 shuffle 数据。驱动程序最终会重新运行必要的映射器,然后重新运行 reduce 阶段。Spark 的效率有点低(https://issues.apache.org/jira/browse/SPARK-20178),但它可以工作。

移除节点时随机播放

请注意,您可能会在以下三种情况之一中丢失节点:

  1. 有意删除节点(自动缩放或手动缩减)
  2. 抢占式虚拟机 被抢占。抢占式虚拟机至少每 24 小时被抢占一次。
  3. (相对罕见)标准 GCE 虚拟机被 GCE 非正常终止,并重新启动。通常,标准 VM 是透明地实时迁移的。

当您创建自动扩缩集群时,Dataproc会添加几个属性来提高作业在节点丢失时的弹性:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10
spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

请注意,如果您在现有集群上启用自动缩放,则不会设置这些属性。(但您可以在创建集群时手动设置它们)。

缓解措施

1)使用优雅的退役

Dataproc 与YARN 的 Graceful Decommissioning集成,可以在 Autoscaling Policies 或手动缩减操作上进行设置。

当优雅地停用节点时,YARN 会保留它,直到在节点上运行容器的应用程序完成,但不会让它运行新容器。这使节点有机会在被删除之前提供其洗牌数据。

您将需要确保您的正常停用超时时间足够长,以涵盖您最长的作业。自动缩放文档建议1h作为起点。

请注意,只有处理大量短作业的长时间运行的集群才真正有意义。

在临时集群上,最好从一开始就“调整规模”集群,或者禁用缩减规模,除非集群完全空闲(设置scaleDownMinWorkerFraction=1.0)。

2) 避免抢占式虚拟机

即使使用优雅退役,抢占式 VM 也会通过“抢占”定期终止。GCE 保证抢占式虚拟机会在 24 小时内被抢占,大型集群上的抢占非常分散。

如果您正在使用正常停用,并且FetchFailedException错误消息包括-sw-,您可能会看到由于节点被抢占而导致的获取失败。

您有两个选项可以避免使用抢占式虚拟机: 1. 在您的自动扩展策略中,您可以设置secondaryWorkerConfig拥有 0 分钟和最大实例,而是将所有工作人员放在主要组中。2. 或者,您可以继续使用“次要”工作人员,但设置--properties dataproc:secondary-workers.is-preemptible.override=false. 这将使您的辅助工作者成为标准虚拟机。

3) 长期:增强的灵活性模式

Dataproc 的增强灵活性模式是对随机播放问题的长期解决方案。

缩小问题是由存储在本地磁盘上的随机数据引起的。EFM 将包括新的shuffle 实现,允许将 shuffle 数据放置在一组固定的节点(例如,仅主要工作人员)上,或放在集群外部的存储上。

这将使二级工人无国籍,这意味着他们可以随时被移除。这使得自动缩放更加引人注目。

目前,EFM 仍处于 Alpha 阶段,无法扩展到现实世界的工作负载,但请注意在夏季之前推出可用于生产的 Beta。

于 2020-01-27T22:41:05.773 回答