62

我需要使用

(rdd.)partitionBy(npartitions, custom_partitioner)

DataFrame 上不可用的方法。所有 DataFrame 方法仅引用 DataFrame 结果。那么如何从 DataFrame 数据中创建一个 RDD 呢?

注意:这是对 1.2.0 的更改(在 1.3.0 中)。

从@dpangmao 的答案更新:方法是.rdd。我有兴趣了解 (a) 它是否是公开的,以及 (b) 对性能的影响是什么。

好吧,(a)是肯定的,(b)——你可以在这里看到有显着的性能影响:必须通过调用mapPartitions创建一个新的 RDD :

dataframe.py中(注意文件名也发生了变化(原为 sql.py):

@property
def rdd(self):
    """
    Return the content of the :class:`DataFrame` as an :class:`RDD`
    of :class:`Row` s.
    """
    if not hasattr(self, '_lazy_rdd'):
        jrdd = self._jdf.javaToPython()
        rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
        schema = self.schema

        def applySchema(it):
            cls = _create_cls(schema)
            return itertools.imap(cls, it)

        self._lazy_rdd = rdd.mapPartitions(applySchema)

    return self._lazy_rdd
4

3 回答 3

102

使用这样的方法.rdd

rdd = df.rdd
于 2015-03-18T17:36:16.587 回答
83

@dapangmao 的答案有效,但它没有给出常规的 spark RDD,它返回一个 Row 对象。如果你想拥有常规的 RDD 格式。

尝试这个:

rdd = df.rdd.map(tuple)

或者

rdd = df.rdd.map(list)
于 2016-05-17T21:13:31.803 回答
6

kennyut/Kistian 给出的答案效果很好,但是当RDD 包含属性列表时,要获得类似 RDD 的输出,例如 [1,2,3,4],我们可以使用下面的 flatmap 命令,

rdd = df.rdd.flatMap(list)
or 
rdd = df.rdd.flatmap(lambda x: list(x))
于 2018-05-14T17:39:58.133 回答