4

在努力提高代码性能时,因为我有许多作业失败(中止),persist()当我需要在许多其他操作上使用相同的数据帧时,我考虑在 Spark Dataframe 上使用函数。在执行此操作并跟踪 Spark 应用程序 UI 中的工作和阶段时,我觉得这样做并不总是最佳的,这取决于分区的数量和数据大小。直到我因为坚持阶段的失败而中止了工作,我才确定。

我质疑在数据帧上执行许多操作时使用的最佳实践persist()是否始终有效? 如果不是,什么时候不是?如何判断?

更具体地说,我将介绍我的代码和中止工作的详细信息:

#create a dataframe from another one df_transf_1 on which I made a lot of transformations but no actions
spark_df = df_transf_1.select('user_id', 'product_id').dropDuplicates()
#persist
spark_df.persist()
products_df = spark_df[['product_id']].distinct()
df_products_indexed = products_df.rdd.map(lambda r: r.product_id).zipWithIndex().toDF(['product_id', 'product_index'])

你可能会问我为什么坚持spark_df?这是因为我将多次使用它,例如 withproducts_df和 in joins(例如:spark_df = spark_df.join(df_products_indexed,"product_id")

工作阶段

第 3 阶段失败原因的详细信息:

作业因阶段故障而中止:阶段 3.0 中的任务 40458 失败 4 次,最近一次失败:阶段 3.0 中丢失任务 40458.3(TID 60778,xx.xx.yyyy.com,执行程序 91):ExecutorLostFailure(执行程序 91 因一个原因退出正在运行的任务)原因:从机丢失驱动程序堆栈跟踪:

输入数据的大小(4 TB)很大,在坚持之前有没有办法检查数据的大小?它是选择坚持还是不坚持的参数?还有大于 100,000的分区(任务)数persist

4

2 回答 2

2

这里有两种使用情况persist()

  • 使用repartition后为了避免在后续步骤使用数据帧时一次又一次地改组您的数据。这仅适用于您为持久化数据帧/RDD 调用多个操作的情况,因为持久化是一种转换,因此会延迟评估。一般来说,如果您对同一个数据帧/RDD 有多个操作。

  • 迭代计算,例如当您想在 for 循环中查询数据帧时。使用persistSpark 将保存中间结果并省略在每个操作调用上重新评估相同的操作。另一个示例是在新列中添加 a ,如此join所述。

于 2019-05-11T19:17:55.737 回答
1

我的经验告诉我,当您对数据框执行多项操作时,您应该保留数据框,以便创建临时表(同时确保如果出现故障,您有一个恢复点)。通过这样做,您可以防止通常不会结束的巨大 DAG,例如,如果您有连接。所以我的建议是做这样的事情:

# operations
df.write.saveAsTable('database.tablename_temp')
df = spark.table('database.tablename_temp')
# more operations
于 2019-04-25T11:36:41.303 回答