16

我们有两个 HDP 集群的设置,我们称它们为 A 和 B。

集群 A 节点

  • 共包含20台商品机。
  • 有20个数据节点。
  • 由于配置了namenode HA,因此有一个活动的和一个备用的namenode。

集群 B 节点

  • 共包含5台商品机。
  • 有5个数据节点。
  • 没有配置 HA,并且该集群有一个主名称节点和一个辅助名称节点。

我们的应用程序中有三个主要组件,它们对传入的文件执行 ETL(提取、转换和加载)操作。我将这些组件分别称为 E、T 和 L。

组件 E 特性

  • 该组件是一个 Apache Spark 作业,它仅在集群 B 上运行。
  • 它的工作是从 NAS 存储中提取文件并将它们放入集群 B 中的 HDFS。

组件 T 特性

  • 该组件也是一个 Apache Spark 作业,它在集群 B 上运行。
  • 它的工作是把组件 E 写入的 HDFS 中的文件提取出来,对其进行转换,然后将转换后的文件写入集群 A 中的 HDFS。

组件 L 特性

  • 该组件也是一个 Apache Spark 作业,它仅在集群 A 上运行。
  • 它的工作是拾取组件 T 写入的文件并将数据加载到集群 A 中存在的 Hive 表中。

组件 L 是所有三个组件中的佼佼者,我们没有遇到任何故障。组件 E 中有一些无法解释的小故障,但组件 T 是最麻烦的一个。

组件 E 和 T 都使用 DFS 客户端与名称节点通信。

以下是我们在运行组件 T 时间歇性观察到的异常的摘录:

clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020;
            at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782)
            at org.apache.hadoop.ipc.Client.call(Client.java:1459)
            at org.apache.hadoop.ipc.Client.call(Client.java:1392)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
            at com.sun.proxy.$Proxy15.complete(Unknown Source)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
            at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
            at com.sun.proxy.$Proxy16.complete(Unknown Source)
            at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361)
            at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338)
            at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303)
            at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
            at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
            at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
            at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
            at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
            at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
            at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34)
            at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47)
            at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142)
            at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239)
            at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072)
            at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956)
            at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
            at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
            at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
            at sun.nio.ch.IOUtil.read(IOUtil.java:197)
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
            at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
            at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554)
            at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
            at java.io.DataInputStream.readInt(DataInputStream.java:387)
            at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116)
            at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)   

如前所述,我们会间歇性地遇到这个异常,当它确实发生时,我们的应用程序会卡住,导致我们重新启动它。

我们尝试过的解决方案:

  • 我们的第一个怀疑是我们正在重载集群 A 中的活动名称节点,因为组件 T 确实并行打开了许多 DFS 客户端并对不同的文件执行文件操作(没有争用相同文件的问题)。为了解决这个问题,我们查看了 namenode dfs.namenode.handler.countipc.server.listen.queue.size的两个关键参数,并将后者从 128(默认)提高到 1024。

  • 不幸的是,这个问题仍然存在于组件 T 中。我们开始对这个问题采取不同的方法。我们只专注于寻找发生对等连接重置的原因。根据很多文章和堆栈交换讨论,问题描述如下,对端设置RST标志,导致连接立即终止。在我们的例子中,我们确定对等点是集群 A 的名称节点。

  • 牢记 RST 标志,我深入了解 TCP 通信的内部结构,只了解 RST 标志的原因。

  • Linux 发行版(不是 BSD)中的每个套接字都有两个与之关联的队列,即接受和积压队列。
  • 在 TCP 握手过程中,所有请求都保留在 backlog 队列中,直到收到来自开始建立连接的节点的 ACK 数据包。一旦收到,请求就会被转移到接受队列,打开套接字的应用程序可以开始接收来自远程客户端的数据包。
  • 积压队列的大小由两个内核级参数控制,即net.ipv4.tcp_max_syn_backlognet.core.somaxconn,而应用程序(在我们的例子中为 namenode )可以向内核请求它希望受上限限制的队列大小(我们认为接受队列大小是由ipc.server.listen.queue.size定义的队列大小)。
  • 另外,这里要注意的另一个有趣的事情是,如果net.ipv4.tcp_max_syn_backlog的大小大于net.core.somaxconn,则前者的值将被截断为后者的值。此声明基于 Linux 文档,可在https://linux.die.net/man/2/listen找到。
  • 回到正题,当 backlog 完全填满时,TCP 有两种行为方式,这种行为也可以由名为net.ipv4.tcp_abort_on_overflow的内核参数控制。默认设置为 0,当积压已满时,内核会丢弃任何新的 SYN 数据包,这反过来又让发送方重新发送 SYN 数据包。当设置为 1 时,内核将在数据包中标记 RST 标志并将其发送给发送方,从而突然终止连接。

  • 我们检查了上述内核参数的值,发现net.core.somaxconn设置为 1024,net.ipv4.tcp_abort_on_overflow设置为 0,net.ipv4.tcp_max_syn_backlog设置为 4096集群。

  • 我们现在唯一怀疑的是连接集群 A 和集群 B 的交换机,因为任何集群中的任何机器都不会设置 RST 标志,因为参数net.ipv4.tcp_abort_on_overflow设置为 0。

我的问题

  • 从 HDFS 文档中可以明显看出,DFS Client 使用 RPC 与 namenode 通信以执行文件操作。每个 RPC 调用是否都涉及到 namenode 的 TCP 连接的建立?
  • 参数ipc.server.listen.queue.size是否定义了 namenode 接受 RPC 请求的套接字的接受队列长度?
  • 重负载时,namenode 是否可以隐式关闭与 DFS 客户端的连接,从而使内核发送设置了 RST 标志的数据包,即使内核参数net.ipv4.tcp_abort_on_overflow设置为 0?
  • L2 或 L3 交换机(用于连接我们两个集群中的机器)是否能够设置 RST 标志,因为它们无法处理突发流量?

我们解决这个问题的下一个方法是通过使用 tcpdump 或 wireshark 分析数据包来确定哪台机器或交换机(不涉及路由器)正在设置 RST 标志。我们还将上述所有队列的大小增加到 4096,以有效处理突发流量。

namenode 日志没有显示任何异常的迹象,除了在 Ambari 中看到的 Namenode 连接负载在某些时间点偷看,而不一定是在发生 Connection Reset By Peer 异常时。

最后,我想知道我们是否走上了解决这个问题的正确轨道,或者我们是否会陷入死胡同?

PS对于我的问题中内容的长度,我深表歉意。在寻求任何帮助或建议之前,我想向读者展示整个背景。感谢您的耐心等待。

4

1 回答 1

3

首先,您的网络中可能确实存在一些奇怪的东西,也许您会设法通过您提到的步骤来追踪它。

话虽如此,在查看步骤时,我个人认为发生了一些违反直觉的事情。

您目前有步骤 T 进行转换,以及最脆弱的集群内传输。也许您看到的可靠性比人们通常看到的要差,但我会认真考虑将复杂部分和脆弱部分分开。

如果你这样做(或简单地将工作分成更小的块),设计一个解决方案应该相当简单,它可能会看到其脆弱的步骤不时失败,但在发生这种情况时会简单地重试。当然,重试成本最低,因为只有一小部分工作需要重试。


总结:这可能有助于解决您的连接问题,但如果可能的话,您可能希望设计为间歇性故障。

于 2019-04-12T14:29:17.480 回答