8

我正在使用spark version 1.6.3yarn version 2.7.1.2.3附带HDP-2.3.0.0-2557. 因为spark版本在我使用的HDP版本中太旧了,我更喜欢远程使用另一个spark作为yarn模式。

这是我运行 spark shell 的方法;

./spark-shell --master yarn-client

一切似乎都很好,sparkContext正在初始化,sqlContext正在初始化。我什至可以访问我的蜂巢表。但是在某些情况下,当它尝试连接到块管理器时会遇到麻烦。

我不是专家,但我认为,当我在纱线模式下运行它时,块管理器正在我的纱线集群上运行。第一次觉得是网络问题,不想在这里问。但是,这发生在某些我还无法弄清楚的情况下。所以这让我觉得这可能不是网络问题。

这是代码;

def df = sqlContext.sql("select * from city_table")

下面的代码工作正常;

df.limit(10).count()

但是大小超过10,我不知道,每次运行都会改变;

df.count()

这引发了一个异常;

6/12/30 07:31:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 157 bytes
16/12/30 07:31:19 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 8, 172.27.247.204): FetchFailed(BlockManagerId(2, 172.27.247.204, 56093), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.27.247.204:56093
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: java.net.ConnectException: Connection refused: /172.27.247.204:56093
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    ... 1 more

)

我可能刚刚意识到,当有多个任务要洗牌时,就会发生这种情况。

问题是什么,是性能问题还是我看不到的其他网络问题。那是什么洗牌?如果是网络问题,是我的 spark 和 yarn 之间的问题,还是纱线本身的问题?

谢谢你。

编辑:

我只是在日志中看到了一些东西;

17/01/02 06:45:17 INFO DAGScheduler: Executor lost: 2 (epoch 13)
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, 172.27.247.204, 51809)
17/01/02 06:45:17 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
17/01/02 06:45:17 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/01/02 06:45:24 INFO BlockManagerMasterEndpoint: Registering block manager 172.27.247.204:51809 with 511.1 MB RAM, BlockManagerId(2, 172.27.247.204, 51809)

有时,在另一个块管理器上重试它是可行的,但是,因为超过了默认的最大允许次数 4,所以它大部分时间都不会结束。

编辑2:

Yarn 对此真的很沉默,但我认为这是网络问题,我可以将问题迭代到某个地方;

此 spark 部署在 HDP 环境之外。当 spark 向 yarn 提交应用程序时,yarn 会通知 spark 驱动程序有关块管理器和执行器的信息。Executors 是 HDP 集群中的数据节点,在其私有网络中具有不同的 IP。但是,当涉及到在集群外部通知 spark 驱动程序时,它为所有执行程序提供相同且始终单一的 IP。这是因为 HDP 集群中的所有节点都通过路由器并具有相同的 IP。假设 IP 是150.150.150.150,当 spark 驱动程序需要连接并向执行程序询问某些内容时,它会使用此 IP 进行尝试。但是这个IP实际上是整个集群的外部IP地址,而不是单个数据节点的IP。

有没有办法让纱线通过其私有 ip 通知执行者(块管理器)。因为,他们的私有 IP 也可以从这个 spark 驱动程序正在处理的机器上访问。

4

2 回答 2

12

FetchFailedException当 reducer 任务(对于 a ShuffleDependency)无法获取 shuffle 块时,将引发异常。这通常意味着执行者(带有BlockManagerfor shuffle 块)死亡,因此出现异常:

Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093

执行程序可能会 OOMed(= 抛出 OutOfMemoryError)或 YARN 由于内存使用过多而决定终止它。

您应该使用yarn logs命令查看 Spark 应用程序的日志并找出问题的根本原因。

yarn logs -applicationId <application ID> [options]

您还可以在 Web UI 的 Executors 选项卡中查看 Spark 应用程序的执行程序的状态。

Spark 通常FetchFailedException通过重新运行受影响的任务来恢复。使用 Web UI 查看 Spark 应用程序的执行情况。FetchFailedException可能是由于临时记忆“打嗝”。

于 2016-12-30T11:05:47.253 回答
0

这是火花中的已知错误,仍在版本 2.1.0 https://issues.apache.org/jira/browse/SPARK-5928

于 2018-03-28T09:00:00.353 回答