我们有两个 HDP 集群的设置,我们称它们为 A 和 B。
集群 A 节点:
- 共包含20台商品机。
- 有20个数据节点。
- 由于配置了namenode HA,因此有一个活动的和一个备用的namenode。
集群 B 节点:
- 共包含5台商品机。
- 有5个数据节点。
- 没有配置 HA,并且该集群有一个主名称节点和一个辅助名称节点。
我们的应用程序中有三个主要组件,它们对传入的文件执行 ETL(提取、转换和加载)操作。我将这些组件分别称为 E、T 和 L。
组件 E 特性:
- 该组件是一个 Apache Spark 作业,它仅在集群 B 上运行。
- 它的工作是从 NAS 存储中提取文件并将它们放入集群 B 中的 HDFS。
组件 T 特性:
- 该组件也是一个 Apache Spark 作业,它在集群 B 上运行。
- 它的工作是把组件 E 写入的 HDFS 中的文件提取出来,对其进行转换,然后将转换后的文件写入集群 A 中的 HDFS。
组件 L 特性:
- 该组件也是一个 Apache Spark 作业,它仅在集群 A 上运行。
- 它的工作是拾取组件 T 写入的文件并将数据加载到集群 A 中存在的 Hive 表中。
组件 L 是所有三个组件中的佼佼者,我们没有遇到任何故障。组件 E 中有一些无法解释的小故障,但组件 T 是最麻烦的一个。
组件 E 和 T 都使用 DFS 客户端与名称节点通信。
以下是我们在运行组件 T 时间歇性观察到的异常的摘录:
clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782)
at org.apache.hadoop.ipc.Client.call(Client.java:1459)
at org.apache.hadoop.ipc.Client.call(Client.java:1392)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy15.complete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy16.complete(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34)
at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47)
at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142)
at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239)
at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072)
at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956)
at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96)
at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131)
at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123)
at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)
如前所述,我们会间歇性地遇到这个异常,当它确实发生时,我们的应用程序会卡住,导致我们重新启动它。
我们尝试过的解决方案:
我们的第一个怀疑是我们正在重载集群 A 中的活动名称节点,因为组件 T 确实并行打开了许多 DFS 客户端并对不同的文件执行文件操作(没有争用相同文件的问题)。为了解决这个问题,我们查看了 namenode dfs.namenode.handler.count和ipc.server.listen.queue.size的两个关键参数,并将后者从 128(默认)提高到 1024。
不幸的是,这个问题仍然存在于组件 T 中。我们开始对这个问题采取不同的方法。我们只专注于寻找发生对等连接重置的原因。根据很多文章和堆栈交换讨论,问题描述如下,对端设置了RST标志,导致连接立即终止。在我们的例子中,我们确定对等点是集群 A 的名称节点。
牢记 RST 标志,我深入了解 TCP 通信的内部结构,只了解 RST 标志的原因。
- Linux 发行版(不是 BSD)中的每个套接字都有两个与之关联的队列,即接受和积压队列。
- 在 TCP 握手过程中,所有请求都保留在 backlog 队列中,直到收到来自开始建立连接的节点的 ACK 数据包。一旦收到,请求就会被转移到接受队列,打开套接字的应用程序可以开始接收来自远程客户端的数据包。
- 积压队列的大小由两个内核级参数控制,即net.ipv4.tcp_max_syn_backlog和net.core.somaxconn,而应用程序(在我们的例子中为 namenode )可以向内核请求它希望受上限限制的队列大小(我们认为接受队列大小是由ipc.server.listen.queue.size定义的队列大小)。
- 另外,这里要注意的另一个有趣的事情是,如果net.ipv4.tcp_max_syn_backlog的大小大于net.core.somaxconn,则前者的值将被截断为后者的值。此声明基于 Linux 文档,可在https://linux.die.net/man/2/listen找到。
回到正题,当 backlog 完全填满时,TCP 有两种行为方式,这种行为也可以由名为net.ipv4.tcp_abort_on_overflow的内核参数控制。默认设置为 0,当积压已满时,内核会丢弃任何新的 SYN 数据包,这反过来又让发送方重新发送 SYN 数据包。当设置为 1 时,内核将在数据包中标记 RST 标志并将其发送给发送方,从而突然终止连接。
我们检查了上述内核参数的值,发现net.core.somaxconn设置为 1024,net.ipv4.tcp_abort_on_overflow设置为 0,net.ipv4.tcp_max_syn_backlog设置为 4096集群。
我们现在唯一怀疑的是连接集群 A 和集群 B 的交换机,因为任何集群中的任何机器都不会设置 RST 标志,因为参数net.ipv4.tcp_abort_on_overflow设置为 0。
我的问题
- 从 HDFS 文档中可以明显看出,DFS Client 使用 RPC 与 namenode 通信以执行文件操作。每个 RPC 调用是否都涉及到 namenode 的 TCP 连接的建立?
- 参数ipc.server.listen.queue.size是否定义了 namenode 接受 RPC 请求的套接字的接受队列长度?
- 重负载时,namenode 是否可以隐式关闭与 DFS 客户端的连接,从而使内核发送设置了 RST 标志的数据包,即使内核参数net.ipv4.tcp_abort_on_overflow设置为 0?
- L2 或 L3 交换机(用于连接我们两个集群中的机器)是否能够设置 RST 标志,因为它们无法处理突发流量?
我们解决这个问题的下一个方法是通过使用 tcpdump 或 wireshark 分析数据包来确定哪台机器或交换机(不涉及路由器)正在设置 RST 标志。我们还将上述所有队列的大小增加到 4096,以有效处理突发流量。
namenode 日志没有显示任何异常的迹象,除了在 Ambari 中看到的 Namenode 连接负载在某些时间点偷看,而不一定是在发生 Connection Reset By Peer 异常时。
最后,我想知道我们是否走上了解决这个问题的正确轨道,或者我们是否会陷入死胡同?
PS对于我的问题中内容的长度,我深表歉意。在寻求任何帮助或建议之前,我想向读者展示整个背景。感谢您的耐心等待。