15

我有一个 rdd(我们可以称之为 myrdd),其中 rdd 中的每条记录都采用以下形式:

[('column 1',value), ('column 2',value), ('column 3',value), ... , ('column 100',value)]

我想将其转换为 pyspark 中的 DataFrame - 最简单的方法是什么?

4

4 回答 4

32

使用toDF方法怎么样?您只需要添加字段名称。

df = rdd.toDF(['column', 'value'])
于 2015-04-09T19:23:10.707 回答
15

@dapangmao 的回答让我得到了这个解决方案:

my_df = my_rdd.map(lambda l: Row(**dict(l))).toDF()
于 2015-04-10T20:48:49.177 回答
4

看看DataFrame 文档以使此示例适合您,但这应该可以。我假设你的 RDD 被称为my_rdd

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# You have a ton of columns and each one should be an argument to Row
# Use a dictionary comprehension to make this easier
def record_to_row(record):
    schema = {'column{i:d}'.format(i = col_idx):record[col_idx] for col_idx in range(1,100+1)}
    return Row(**schema)


row_rdd = my_rdd.map(lambda x: record_to_row(x))

# Now infer the schema and you have a DataFrame
schema_my_rdd = sqlContext.inferSchema(row_rdd)

# Now you have a DataFrame you can register as a table
schema_my_rdd.registerTempTable("my_table")

我在 Spark 中使用 DataFrames 的工作不多,但这应该可以解决问题

于 2015-04-07T21:51:36.423 回答
1

在 pyspark 中,假设您有一个名为userDF的数据框。

>>> type(userDF)
<class 'pyspark.sql.dataframe.DataFrame'>

让我们将其转换为 RDD (

userRDD = userDF.rdd
>>> type(userRDD)
<class 'pyspark.rdd.RDD'>

现在您可以进行一些操作并调用例如 map 函数:

newRDD = userRDD.map(lambda x:{"food":x['favorite_food'], "name":x['name']})

最后,让我们从弹性分布式数据集 ( RDD )创建一个 DataFrame 。

newDF = sqlContext.createDataFrame(newRDD, ["food", "name"])

>>> type(ffDF)
<class 'pyspark.sql.dataframe.DataFrame'>

就这样。

在我尝试拨打电话之前,我曾收到此警告消息:

newDF = sc.parallelize(newRDD, ["food","name"] : 

.../spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row inst  warnings.warn("Using RDD of dict to inferSchema is deprecated. "

所以没有必要再这样做了......

于 2016-09-09T04:20:50.600 回答