0

现实生活中的 df 是一个无法加载到驱动程序内存中的海量数据帧。这可以使用常规或 pandas udf 来完成吗?

# Code to generate a sample dataframe

from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
import numpy as np

sample = [['123',[[0,1,0,0,0,1,1,1,1,1,1,0,1,0,0,0,1,1,1,1,1,1], [0,1,0,0,0,1,1,1,1,1,1,0,1,0,0,0,1,1,1,1,1,1]]],
      ['345',[[1,0,0,0,0,1,1,1,0,1,1,0,1,0,0,0,1,1,1,1,1,1], [0,1,0,0,0,1,1,1,1,1,1,0,1,0,0,0,1,1,1,1,1,1]]],
      ['425',[[1,1,0,0,0,1,0,1,1,1,1,0,1,0,0,0,1,1,1,1,1,1],[0,1,0,0,0,1,1,1,1,1,1,0,1,0,0,0,1,1,1,1,1,1]]],
      ]

df = spark.createDataFrame(sample,["id", "data"])

这是需要在不依赖驱动程序内存的情况下并行化的逻辑。

输入:Spark 数据帧输出:要输入 horovod 的 numpy 数组(类似这样的:https ://docs.databricks.com/applications/deep-learning/distributed-training/mnist-tensorflow-keras.html )

pandas_df = df.toPandas() # Not possible in real life
data_array = np.asarray(list(pandas_df.data.values))
data_array = data_array.reshape(data_array.shape[0], data_array.shape[1], -1, 1, order='F')
data_array = data_array.reshape(data_array.shape[0],data_array.shape[1],-1,1,1,order="F").transpose(0,1,3,2,-1)
# Some more numpy specific transformations ..

这是一种不起作用的方法:

@pandas_udf(ArrayType(IntegerType()), PandasUDFType.SCALAR)
def generate_feature(x):
    data_array = np.asarray(x)
    data_array = data_array.reshape(data_array.shape[0], ..
    ...
    return pd.Series(data_array)

df = df.withColumn("data_array", generate_feature(df.data))
4

1 回答 1

0


我正在尝试使用图像来处理类似的案例。我期待Petastorm这样做。您可以将数据从 Rdd 保存为 Parquet 格式,然后在 horovod 中使用。
- 我还没有测试这个。
- 如何使用 horovod 中的排名获取部分数据集,也需要测试。
只是一个可以帮助的提示。
谢谢。

于 2020-03-06T11:30:56.157 回答