0
  • 火花 3.0.1
  • hadoop-aws 3.2.0

我有一个简单的 spark 流应用程序,它从 Kafka 主题读取消息,将它们聚合并写入 Elasticsearch。我正在使用检查点和 S3 存储桶来存储它们。

一段时间后,应用程序开始失败,但出现以下异常:

[476.099s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
Error in TaskCompletionListener
java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
at java.base/java.lang.Thread.start0(Native Method)
at java.base/java.lang.Thread.start(Thread.java:801)
at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:939)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1345)
at com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:480)
at com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
at com.google.common.util.concurrent.ForwardingListeningExecutorService.submit(ForwardingListeningExecutorService.java:40)
at org.apache.hadoop.util.SemaphoredDelegatingExecutor.submit(SemaphoredDelegatingExecutor.java:112)
at com.google.common.util.concurrent.ForwardingListeningExecutorService.submit(ForwardingListeningExecutorService.java:40)
at org.apache.hadoop.util.SemaphoredDelegatingExecutor.submit(SemaphoredDelegatingExecutor.java:112)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.putObject(S3ABlockOutputStream.java:434)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:365)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.cancel(CheckpointFileManager.scala:163)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$cancelDeltaFile(HDFSBackedStateStoreProvider.scala:507)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.abort(HDFSBackedStateStoreProvider.scala:150)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$2(package.scala:65)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$2$adapted(package.scala:64)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125)
at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
at org.apache.spark.scheduler.Task.run(Task.scala:143)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)

VisualVM 显示,线程数量从一开始就上升,直到达到最大值(~4.8K): image

其中大多数是:

  • s3a-transfer-unbounded-poolXXX-tXX
  • s3a-transfer-shared-poolXXX-tXX

据我了解,创建这些线程池的唯一地方是

org.apache.hadoop.fs.s3a.S3AFileSystem#initialize

Spark 每次都会创建新的文件系统

org.apache.spark.sql.execution.streaming.StreamMetadata#write

叫做。

为什么会这样?如何防止创建此线程?

4

1 回答 1

0

您无法停止创建这些线程,因为 AWS 传输管理器需要线程池,它位于 AWS 库中。当 S3A 的 close() 方法被调用时,它会关闭传输管理器和线程池。这意味着:问题在于 spark 没有关闭 FS 实例。

确保您没有禁用 FS 实例的缓存,例如 fs.s3a.impl.disable.cache 必须为 false。这是默认设置 - 所以找出它正在更改的位置并停止它。

spark.hadoop.fs.s3a.impl.disable.cache false
于 2021-03-04T14:30:40.290 回答