- 火花 3.0.1
- hadoop-aws 3.2.0
我有一个简单的 spark 流应用程序,它从 Kafka 主题读取消息,将它们聚合并写入 Elasticsearch。我正在使用检查点和 S3 存储桶来存储它们。
一段时间后,应用程序开始失败,但出现以下异常:
[476.099s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
Error in TaskCompletionListener
java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
at java.base/java.lang.Thread.start0(Native Method)
at java.base/java.lang.Thread.start(Thread.java:801)
at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:939)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1345)
at com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:480)
at com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
at com.google.common.util.concurrent.ForwardingListeningExecutorService.submit(ForwardingListeningExecutorService.java:40)
at org.apache.hadoop.util.SemaphoredDelegatingExecutor.submit(SemaphoredDelegatingExecutor.java:112)
at com.google.common.util.concurrent.ForwardingListeningExecutorService.submit(ForwardingListeningExecutorService.java:40)
at org.apache.hadoop.util.SemaphoredDelegatingExecutor.submit(SemaphoredDelegatingExecutor.java:112)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.putObject(S3ABlockOutputStream.java:434)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:365)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.cancel(CheckpointFileManager.scala:163)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$cancelDeltaFile(HDFSBackedStateStoreProvider.scala:507)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.abort(HDFSBackedStateStoreProvider.scala:150)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$2(package.scala:65)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$2$adapted(package.scala:64)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125)
at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
at org.apache.spark.scheduler.Task.run(Task.scala:143)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
VisualVM 显示,线程数量从一开始就上升,直到达到最大值(~4.8K): image
其中大多数是:
- s3a-transfer-unbounded-poolXXX-tXX
- s3a-transfer-shared-poolXXX-tXX
据我了解,创建这些线程池的唯一地方是
org.apache.hadoop.fs.s3a.S3AFileSystem#initialize
Spark 每次都会创建新的文件系统
org.apache.spark.sql.execution.streaming.StreamMetadata#write
叫做。
为什么会这样?如何防止创建此线程?