1

我将 Jupyter Notebook 与 PySpark 一起使用。在其中,我有一个数据框,该数据框具有包含这些列的列名和类型(整数,...)的模式。现在我使用 flatMap 之类的方法,但这会返回一个不再具有固定类型的元组列表。有没有办法做到这一点?

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- ...
 |-- ...
 |-- ratings: integer (nullable = true)

然后我使用 flatMap 对评级值进行一些计算(在此混淆):

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings))
y_rate.toDF().printSchema()

现在我得到一个错误:

TypeError:无法推断类型的架构:

有没有办法通过保留模式来使用 map/flatMap/reduce?或者至少返回具有特定类型值的元组?

4

1 回答 1

2

首先,您使用了错误的功能。flatMapmapflatten假设您的数据如下所示:

df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"])

的输出flatMap将相当于:

sc.parallelize(['foo', 0, 'bar', 5])

因此,您看到的错误。如果你真的想让它工作,你应该使用map

df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF()
## DataFrame[_1: string, _2: bigint]

接下来,DataFrame2.0 不再支持映射。您应该先提取rdd(见df.rdd.map上文)。

最后在 Python 和 JVM 之间传递数据效率极低。它不仅需要在 Python 和 JVM 之间通过相应的序列化/反序列化和模式推断(如果未显式提供模式)来传递数据,这也打破了惰性。最好将 SQL 表达式用于以下情况:

from pyspark.sql.functions import when

df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))

如果出于某种原因您需要纯 Python 代码,UDF 可能是更好的选择。

于 2016-05-14T10:53:36.207 回答