0

我正在尝试使用字典理解将 PySpark 排序行列表转换为 Pandas 数据框,但仅在明确说明所需字典的键和值时才有效。

row_list = sorted(data, key=lambda row: row['date'])

future_df = {'key': int(key),
             'date': map(lambda row: row["date"], row_list),
             'col1': map(lambda row: row["col1"], row_list),
             'col2': map(lambda row: row["col2"], row_list)} 

然后将其转换为 Pandas:

pd.DataFrame(future_df)

此操作可在由以下方式调用的类 ForecastByKey 中找到:

rdd = df.select('*')
    .rdd \
    .map(lambda row: ((row['key']), row)) \
    .groupByKey() \
    .map(lambda args: spark_ops.run(args[0], args[1]))

到目前为止,一切正常;意思是明确指出字典中的列future_df

尝试使用以下内容转换整组列(700+)时会出现问题:

future_df = {'key': int(key),
             'date': map(lambda row: row["date"], row_list)}

for col_ in columns:
    future_df[col_] = map(lambda row: row[col_], row_list)

pd.DataFrame(future_df)

其中columns包含传递给ForecastByKey类的每个 coumn 的名称。

此操作的结果是具有空列或接近零列的数据框。

我正在使用 Python 3.6.10 和 PySpark 2.4.5

为了获得具有正确信息的数据框,如何进行此迭代?

4

1 回答 1

0

经过一番研究,我意识到这可以通过以下方式解决:

row_list = sorted(data, key=lambda row: row['date'])

def f(x):
    return map(lambda row: row[x], row_list)

pre_df = {col_: col_ for col_ in self.sdf_cols}

future_df = toolz.valmap(f, pre_df)

future_df['key'] = int(key)
于 2020-07-10T07:35:10.880 回答