24

如果我有一个不再需要的 RDD,如何从内存中删除它?以下是否足以完成这项工作:

del thisRDD

谢谢!

4

4 回答 4

15

不,del thisRDD还不够,它只会删除指向 RDD 的指针。您应该调用thisRDD.unpersist()以删除缓存的数据。

供您参考,Spark 使用惰性计算模型,这意味着当您运行此代码时:

>>> thisRDD = sc.parallelize(xrange(10),2).cache()

您不会真正缓存任何数据,它只会在 RDD 执行计划中标记为“待缓存”。你可以这样检查:

>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]

但是当你在这个 RDD 之上至少调用一次操作时,它会被缓存:

>>> thisRDD.count()
10
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
 |       CachedPartitions: 2; MemorySize: 174.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]

您可以使用地址轻松检查 Spark UI 中的持久化数据和持久化级别http://<driver_node>:4040/storage。你会看到那里del thisRDD不会改变这个 RDD 的持久性,但thisRDD.unpersist()会取消它,而你仍然可以在你的代码中使用 thisRDD(虽然它不会再存在于内存中并且每次都会重新计算它是查询)

于 2015-01-19T15:41:03.857 回答
14

简短的回答:以下代码应该可以解决问题:

import gc
del thisRDD
gc.collect()

解释:

即使你使用的是 PySpark,你的 RDD 的数据也是在 Java 端管理的,所以首先让我们问同样的问题,但是对于 Java 而不是 Python:

如果我使用 Java,并且我只是释放对我的 RDD 的所有引用,是否足以自动取消持久化它?

对于 Java,答案是肯定的,根据这个答案,当它被垃圾收集时,RDD 将自动取消持久化。(显然该功能已在此 PR中添加到 Spark 中。)

好的,在 Python 中会发生什么?如果我在 Python 中删除对我的 RDD 的所有引用,这是否会导致它们在 Java 端被删除?

PySpark 使用Py4J将对象从 Python 发送到 Java,反之亦然。根据Py4J 内存模型文档

一旦对象在 Python VM 上被垃圾回收(引用计数 == 0),该引用就会在 Java VM 上被删除

但请注意:删除对 RDD 的 Python 引用不会导致它立即被删除。您必须等待 Python 垃圾收集器清理引用。您可以阅读 Py4J 解释以了解详细信息,他们推荐以下内容:

调用gc.collect()也通常有效。

好的,现在回到你原来的问题:

以下是否足以完成这项工作:

del thisRDD

几乎。 您应该删除对它的最后引用(即del thisRDD),然后,如果您确实需要立即取消持久化 RDD **,请调用gc.collect().

**嗯,从技术上讲,这将立即删除 Java 端的引用,但是在 Java 的垃圾收集器真正执行 RDD 的终结器并因此取消持久化数据之前会有一点延迟。

于 2016-10-10T21:35:06.333 回答
5

Short answer: it depends.

According to pyspark v.1.3.0 source code, del thisRDD should be enough for PipelinedRDD, which is an RDD generated by Python mapper/reducer:

class PipelinedRDD(RDD):
    # ...
    def __del__(self):
        if self._broadcast:
            self._broadcast.unpersist()
            self._broadcast = None

RDD class on the other hand, doesn't have __del__ method (while it probably should), so you should call unpersist method on your own.

Edit: __del__ method was deleted in this commit.

于 2015-02-04T21:07:54.827 回答
3

仅供参考,我会推荐gc.collect()之后del(如果 rdd 占用大量内存)。

于 2016-07-26T22:01:15.793 回答