如果我有一个不再需要的 RDD,如何从内存中删除它?以下是否足以完成这项工作:
del thisRDD
谢谢!
不,del thisRDD
还不够,它只会删除指向 RDD 的指针。您应该调用thisRDD.unpersist()
以删除缓存的数据。
供您参考,Spark 使用惰性计算模型,这意味着当您运行此代码时:
>>> thisRDD = sc.parallelize(xrange(10),2).cache()
您不会真正缓存任何数据,它只会在 RDD 执行计划中标记为“待缓存”。你可以这样检查:
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
| ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]
但是当你在这个 RDD 之上至少调用一次操作时,它会被缓存:
>>> thisRDD.count()
10
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 174.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]
您可以使用地址轻松检查 Spark UI 中的持久化数据和持久化级别http://<driver_node>:4040/storage
。你会看到那里del thisRDD
不会改变这个 RDD 的持久性,但thisRDD.unpersist()
会取消它,而你仍然可以在你的代码中使用 thisRDD(虽然它不会再存在于内存中并且每次都会重新计算它是查询)
简短的回答:以下代码应该可以解决问题:
import gc
del thisRDD
gc.collect()
解释:
即使你使用的是 PySpark,你的 RDD 的数据也是在 Java 端管理的,所以首先让我们问同样的问题,但是对于 Java 而不是 Python:
如果我使用 Java,并且我只是释放对我的 RDD 的所有引用,是否足以自动取消持久化它?
对于 Java,答案是肯定的,根据这个答案,当它被垃圾收集时,RDD 将自动取消持久化。(显然该功能已在此 PR中添加到 Spark 中。)
好的,在 Python 中会发生什么?如果我在 Python 中删除对我的 RDD 的所有引用,这是否会导致它们在 Java 端被删除?
PySpark 使用Py4J将对象从 Python 发送到 Java,反之亦然。根据Py4J 内存模型文档:
一旦对象在 Python VM 上被垃圾回收(引用计数 == 0),该引用就会在 Java VM 上被删除
但请注意:删除对 RDD 的 Python 引用不会导致它立即被删除。您必须等待 Python 垃圾收集器清理引用。您可以阅读 Py4J 解释以了解详细信息,他们推荐以下内容:
调用
gc.collect()
也通常有效。
好的,现在回到你原来的问题:
以下是否足以完成这项工作:
del thisRDD
几乎。 您应该删除对它的最后引用(即del thisRDD
),然后,如果您确实需要立即取消持久化 RDD **,请调用gc.collect()
.
**嗯,从技术上讲,这将立即删除 Java 端的引用,但是在 Java 的垃圾收集器真正执行 RDD 的终结器并因此取消持久化数据之前会有一点延迟。
Short answer: it depends.
According to pyspark v.1.3.0 source code, del thisRDD
should be enough for PipelinedRDD
, which is an RDD generated by Python mapper/reducer:
class PipelinedRDD(RDD):
# ...
def __del__(self):
if self._broadcast:
self._broadcast.unpersist()
self._broadcast = None
RDD
class on the other hand, doesn't have __del__
method (while it probably should), so you should call unpersist
method on your own.
Edit: __del__
method was deleted in this commit.
仅供参考,我会推荐gc.collect()
之后del
(如果 rdd 占用大量内存)。