1

在 Databricks 上使用 pyspark/Delta 湖,我有以下场景:

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)

analysis_1 = result.groupBy(...).count() # transformation performed here
analysis_2 = result.groupBy(...).count() # transformation performed here

据我了解,由于链式执行,带有 Delta 湖的 Sparkresult实际上并不是在声明时计算,而是在使用时计算。

然而,在这个例子中,它被多次使用,因此最昂贵的转换被多次计算。

是否可以在代码中的某个点强制执行,例如

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
result.force() # transformation performed here??

analysis_1 = result.groupBy(...).count() # quick smaller transformation??
analysis_2 = result.groupBy(...).count() # quick smaller transformation??
4

1 回答 1

0

在我看来,问题无处不在,或者说不清楚。但是,如果您是 Spark 的新手,情况可能就是这样。

所以:

有关.force的使用,请参阅https://blog.knoldus.com/getting-lazy-with-scala/ .force 不适用于数据集或数据框。

这与 pyspark 或 Delta Lake 方法有关吗?不,不。

analysis_1 = result.groupBy(...).count() # quick smaller transformation?? 
  • 这实际上是一个在转换之前最有可能导致洗牌的动作。

所以,我认为您的意思是,正如我们尊敬的 pault 所说,以下内容:

  • .cache 或 .persist

你需要我怀疑:

result.cache 

这意味着您的2nd Action analysis_2不需要一直重新计算到此处显示的源

(2) Spark Jobs
Job 16 View(Stages: 3/3)
Stage 43: 
8/8
succeeded / total tasks 
Stage 44: 
200/200
succeeded / total tasks 
Stage 45:   
1/1
succeeded / total tasks 
Job 17 View(Stages: 2/2, 1 skipped)
Stage 46: 
0/8
succeeded / total tasks skipped
Stage 47: 
200/200
succeeded / total tasks 
Stage 48:   
1/1
succeeded / total tasks 

随着对 Spark 的改进,shuffle 分区被保留,在某些情况下也会导致跳过阶段,尤其是对于 RDD。对于数据帧,需要缓存才能获得我观察到的跳过阶段效果。

于 2019-11-02T11:02:52.237 回答