0

我在 ec2 集群上运行 4 个工作人员的 pyspark 作业。我收到此错误:

2018-07-05 08:20:44 WARN  TaskSetManager:66 - Lost task 1923.0 in stage 18.0 (TID 21385, 10.0.5.97, executor 3): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at org.apache.spark.serializer.DummySerializerInstance$1.close(DummySerializerInstance.java:65)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:194)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:416)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:230)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

我查看了https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html

尝试增加 shuffle partitioned - 同样的问题。我的数据在 executors 之间看起来相当均匀。我想尝试将 Null 或 None 分配给数据帧的解决方法,问题是它是否确实会删除中间 shuffle 文件,以及是否不会保留 linage。

例如,如果我的代码如下所示:

df1 = sqlContext.read.parquet(...)
df2= df1.filter()
df3 = df2.groupBy(*groupList).agg(....)

我会放

df1 = Null

像 1 之后 - 它会节省 shuffle 空间,是否需要并且将为 df2 , df3 重新计算?

第二个问题 - 检查点 df1 或 df2 是否有助于打破 linage?

处理大于我的存储的数据时,什么是可行的解决方案(大约处理 400GB 的原始数据)

更新 删除需要此数据帧的 2 个阶段之间的数据帧缓存有帮助,我没有收到任何错误。我想知道它对中间 shuffle 文件有什么帮助。

4

1 回答 1

2

我确实遇到过类似的情况。原因是在使用group by操作和joins数据时会被打乱。由于此shuffle数据是在 spark 应用程序中执行时的临时数据,因此将存储在文件中指向的目录中spark.local.dir,该spark-defaults.conf目录通常是tmp空间较小的目录。

通常,为了避免文件中的此错误,请将spark-defaults.conf文件更新spark.local.dir到具有更多内存的位置。

于 2018-07-05T17:44:41.257 回答