2

如果我调用 map 或mapPartition并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?

目前我做类似的事情:

def combine(partition):
    rows = [x for x in partition]
    dfpart = pd.DataFrame(rows,columns=rows[0].keys())
    pandafunc(dfpart)

mydf.mapPartition(combine)
4

4 回答 4

2

火花 >= 2.3.0

从 Spark 2.3.0 开始,可以使用 PandasSeriesDataFrame按分区或组。参见例如:

火花 < 2.3.0

创建本地 PySpark 的自然方法是什么

哪有这回事。Spark 分布式数据结构不能嵌套,或者您更喜欢不能嵌套动作或转换的另一个视角。

或熊猫数据框

这相对容易,但你必须至少记住几件事:

  • Pandas 和 Spark DataFrames 甚至不完全等同。这些是不同的结构,具有不同的属性,通常不能用另一种替换。
  • 分区可以为空。
  • 看起来你正在传递字典。请记住,基本 Python 字典是无序的(与collections.OrderedDict示例不同)。因此,传递列可能无法按预期工作。
import pandas as pd

rdd = sc.parallelize([
    {"x": 1, "y": -1}, 
    {"x": -3, "y": 0},
    {"x": -0, "y": 4}
])

def combine(iter):
    rows = list(iter)
    return [pd.DataFrame(rows)] if rows else []

rdd.mapPartitions(combine).first()
##    x  y
## 0  1 -1
于 2015-12-23T23:11:01.190 回答
1

你可以使用toPandas()

pandasdf = mydf.toPandas()
于 2015-12-23T16:24:42.207 回答
1

实际上可以在执行器中将 Spark 行转换为 Pandas,最后使用mapPartitions. 在 Github 中查看我的要点

# Convert function to use in mapPartitions
def rdd_to_pandas(rdd_):
    # convert rows to dict
    rows = (row_.asDict() for row_ in rdd_)
    # create pandas dataframe
    pdf = pd.DataFrame(rows)

    # Rows/Pandas DF can be empty depending on patiition logic.
    # Make sure to check it here, otherwise it will throw untrackable error
    if len(pdf) > 0:
        #
        # Do something with pandas DataFrame 
        #
        pass

    return pdf.to_dict(orient='records')

# Create Spark DataFrame from resulting RDD
rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))
于 2017-12-08T14:28:35.927 回答
0

为了创建 Spark SQL 数据框,您需要一个配置单元上下文:

hc = HiveContext(sparkContext)

使用 HiveContext,您可以通过 inferSchema 函数创建 SQL 数据框:

sparkSQLdataframe = hc.inferSchema(rows)  
于 2015-12-23T15:40:43.907 回答