我需要使用
(rdd.)partitionBy(npartitions, custom_partitioner)
DataFrame 上不可用的方法。所有 DataFrame 方法仅引用 DataFrame 结果。那么如何从 DataFrame 数据中创建一个 RDD 呢?
注意:这是对 1.2.0 的更改(在 1.3.0 中)。
从@dpangmao 的答案更新:方法是.rdd。我有兴趣了解 (a) 它是否是公开的,以及 (b) 对性能的影响是什么。
好吧,(a)是肯定的,(b)——你可以在这里看到有显着的性能影响:必须通过调用mapPartitions创建一个新的 RDD :
在dataframe.py中(注意文件名也发生了变化(原为 sql.py):
@property
def rdd(self):
"""
Return the content of the :class:`DataFrame` as an :class:`RDD`
of :class:`Row` s.
"""
if not hasattr(self, '_lazy_rdd'):
jrdd = self._jdf.javaToPython()
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
schema = self.schema
def applySchema(it):
cls = _create_cls(schema)
return itertools.imap(cls, it)
self._lazy_rdd = rdd.mapPartitions(applySchema)
return self._lazy_rdd