0

我在数据块笔记本上的考拉中有一个大数据框(2000 万行,35 列)。我已经使用 python 对其执行了一些转换和加入(合并)操作,例如:

mdf.path_info =  mdf.path_info.transform(modify_path_info)
x = mdf[['providerid','domain_name']].groupby(['providerid']).apply(domain_features)
mdf = ks.merge( mdf, x[['domain_namex','domain_name_grouped']], left_index=True, right_index=True)
x = mdf.groupby(['providerid','uid']).apply(userspecificdetails)
mmdf = mdf.merge(x[['providerid','uid',"date_last_purch","lifetime_value","age"]], how="left", on=['providerid','uid'])

在这些操作之后,我想显示一些数据帧的行来验证生成的数据帧。我试图打印/显示这个大数据帧的 1-5 行,但由于 spark 的惰性评估性质,所有打印命令都会启动 6-12 个 spark 作业并永远运行,之后集群进入不可用状态,然后什么都没发生。

mdf.head() 

display(mdf)

mdf.take([1])

mdf.iloc[0]

还尝试转换为火花数据框,然后尝试:

df = mdf.to_spark()

df.show(1)

df.rdd.takeSample(False, 1, seed=0)

df.first()

我使用的集群配置是8worker_4core_8gb,这意味着每个工作节点和驱动程序节点是8.0 GB 内存、4 个内核、0.5 DBU在 Databricks 运行时版本:7.0(包括 Apache Spark 3.0.0、Scala 2.12)

有人可以通过建议一种更快、更快的方法来获取/打印大数据帧的一行并且不等待处理整个 2000 万行数据帧来提供帮助。

4

2 回答 2

0

您可以cache在转换为 spark 数据帧然后调用action.

df = mdf.to_spark()

# caches the result so the action called after this will use this cached
# result instead of re-computing the DAG
df.cache() 

df.show(1)

您可能希望通过以下方式释放用于缓存的内存:

df.unpersist()
于 2020-11-14T07:05:13.530 回答
0

由于延迟评估,在您编写时,Spark 将首先执行您的转换,然后显示一行。您可以做的是减小输入数据的大小,并在更小的数据集上进行转换,例如:

https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sample

df.sample(False, 0.1, seed=0)
于 2020-11-10T17:45:43.830 回答