2

我很难理解为什么我无法运行转换,在等待了这么多分钟(有时是几个小时)后,它返回错误“序列化结果太大”。

在转换中,我有一个日期列表,我在 for 循环中迭代以在特定时间间隔内进行增量计算。

预期的数据集是迭代数据集的联合,应该包含 450k 行,不是太多,但我有很多计算阶段、任务和尝试!

配置文件已设置为中等配置文件,我无法在其他配置文件上进行缩放,也无法设置 maxResultSize = 0。

代码示例:

Date_list = [All weeks from: '2021-01-01', to: '2022-01-01'] --> ~50 elements
df_total = spark.createDataframe([], schema)
df_date = []

for date in Date_list:
     tmp = df.filter(between [date,  date-7days]).withColumn('example', F.lit(date))
     
     ........

     df2 = df.join(tmp, 'column', 'inner').......

     df_date += [df2]
df_total = df_total.unionByName(union_many(*df_date))

return df_total

不要注意语法。这只是一个例子,说明循环内部有一系列操作。我想要的输出是一个数据框,其中包含每次迭代的数据框!

谢谢!!

4

1 回答 1

5

初始理论

您遇到了 Spark 的已知限制,类似于此处讨论的发现。

但是,有一些方法可以解决这个问题,方法是重新考虑您的实现,而不是一系列发送的指令,描述您希望操作的数据批次,类似于您创建tmpDataFrame 的方式。

不幸的是,这可能需要更多的工作来以这种方式重新思考您的逻辑,因为您希望将您的操作纯粹想象为提供给 PySpark 的一系列列操作命令,而不是逐行操作。有些操作不能纯粹使用 PySpark 调用来完成,所以这并不总是可行的。一般来说,值得仔细考虑。

具体来说

例如,您的数据范围计算可以纯粹在 PySpark 中执行,如果您在多年或其他更大的范围内执行此操作,将会大大加快速度。我们没有使用 Python 列表理解或其他逻辑,而是对一小组初始数据使用列操作来构建我们的范围。

我在这里写了一些关于如何创建日期批处理的示例代码,这应该让您执行join创建tmpDataFrame,之后您可以描述您希望对其执行的操作类型。

创建日期范围的代码(一年中每周的开始和结束日期):

from pyspark.sql import types as T, functions as F, SparkSession, Window
from datetime import date

spark = SparkSession.builder.getOrCreate()

year_marker_schema = T.StructType([
  T.StructField("max_year", T.IntegerType(), False),
])
year_marker_data = [
    {"max_year": 2022}
]
year_marker_df = spark.createDataFrame(year_marker_data, year_marker_schema)
year_marker_df.show()

"""
+--------+
|max_year|
+--------+
|    2022|
+--------+
"""

previous_week_window = Window.partitionBy(F.col("start_year")).orderBy("start_week_index")

year_marker_df = year_marker_df.select(
    (F.col("max_year") - 1).alias("start_year"),
    "*"
).select(
    F.to_date(F.col("max_year").cast(T.StringType()), "yyyy").alias("max_year_date"),
    F.to_date(F.col("start_year").cast(T.StringType()), "yyyy").alias("start_year_date"),
    "*"
).select(
    F.datediff(F.col("max_year_date"), F.col("start_year_date")).alias("days_between"),
    "*"
).select(
    F.floor(F.col("days_between") / 7).alias("weeks_between"),
    "*"
).select(
    F.sequence(F.lit(0), F.col("weeks_between")).alias("week_indices"),
    "*"
).select(
    F.explode(F.col("week_indices")).alias("start_week_index"),
    "*"
).select(
    F.lead(F.col("start_week_index"), 1).over(previous_week_window).alias("end_week_index"),
    "*"
).select(
    ((F.col("start_week_index") * 7) + 1).alias("start_day"),
    ((F.col("end_week_index") * 7) + 1).alias("end_day"),
    "*"
).select(
    F.concat_ws(
        "-",
        F.col("start_year"),
        F.col("start_day").cast(T.StringType())
    ).alias("start_day_string"),
    F.concat_ws(
        "-",
        F.col("start_year"),
        F.col("end_day").cast(T.StringType())
    ).alias("end_day_string"),
    "*"
).select(
    F.to_date(
        F.col("start_day_string"),
        "yyyy-D"
    ).alias("start_date"),
    F.to_date(
        F.col("end_day_string"),
        "yyyy-D"
    ).alias("end_date"),
    "*"
)

year_marker_df.drop(
    "max_year",
    "start_year",
    "weeks_between",
    "days_between",
    "week_indices",
    "max_year_date",
    "start_day_string",
    "end_day_string",
    "start_day",
    "end_day",
    "start_week_index",
    "end_week_index",
    "start_year_date"
).show()

"""
+----------+----------+
|start_date|  end_date|
+----------+----------+
|2021-01-01|2021-01-08|
|2021-01-08|2021-01-15|
|2021-01-15|2021-01-22|
|2021-01-22|2021-01-29|
|2021-01-29|2021-02-05|
|2021-02-05|2021-02-12|
|2021-02-12|2021-02-19|
|2021-02-19|2021-02-26|
|2021-02-26|2021-03-05|
|2021-03-05|2021-03-12|
|2021-03-12|2021-03-19|
|2021-03-19|2021-03-26|
|2021-03-26|2021-04-02|
|2021-04-02|2021-04-09|
|2021-04-09|2021-04-16|
|2021-04-16|2021-04-23|
|2021-04-23|2021-04-30|
|2021-04-30|2021-05-07|
|2021-05-07|2021-05-14|
|2021-05-14|2021-05-21|
+----------+----------+
only showing top 20 rows
"""

潜在的优化

一旦你有了这个代码,如果你不能单独通过连接/列派生来表达你的工作并且被迫使用 执行操作,你可以考虑在你的结果上union_many使用 Spark 的localCheckpoint功能。df2这将允许 Spark 简单地计算结果 DataFrame,而不是将其查询计划添加到您将推送到df_total. 这可以与缓存配对以将结果 DataFrame 保存在内存中,但这取决于您的数据规模。

localCheckpoint并且cache对于避免多次重新计算相同的 DataFrame 和截断在中间 DataFrame 之上完成的查询计划量很有用。

您可能会发现localCheckpoint并且cache可能对您的dfDataFrame 有用,因为它将在您的循环中多次使用(假设您无法重新设计逻辑以使用基于 SQL 的操作,而是仍然被迫使用环形)。

作为何时使用每个的快速而肮脏的总结:

localCheckpoint在计算复杂的 DataFrame 上使用,稍后将在操作中使用。通常这些是馈入unions的节点

在以后将要使用cache多次的 DataFrame 上使用。这通常是位于 for/while 循环之外的 DataFrame,将在循环中调用

全部一起

您的初始代码

Date_list = [All weeks from: '2021-01-01', to: '2022-01-01'] --> ~50 elements
df_total = spark.createDataframe([], schema)
df_date = []

for date in Date_list:
     tmp = df.filter(between [date,  date-7days]).withColumn('example', F.lit(date))
     
     ........

     df2 = df.join(tmp, 'column', 'inner').......

     df_date += [df2]
df_total = df_total.unionByName(union_many(*df_date))

return df_total

现在应该看起来像:

# year_marker_df as derived in my code above

year_marker_df = year_marker_df.cache()

df = df.join(year_marker_df, df.my_date_column between year_marker_df.start_date, year_marker_df.end_date)
# Other work previously in your for_loop, resulting in df_total
return df_total

或者,如果您无法重新进行内部循环操作,您可以进行一些优化,例如:

Date_list = [All weeks from: '2021-01-01', to: '2022-01-01'] --> ~50 elements
df_total = spark.createDataframe([], schema)
df_date = []

df = df.cache()
for date in Date_list:
     tmp = df.filter(between [date,  date-7days]).withColumn('example', F.lit(date))
     
     ........

     df2 = df.join(tmp, 'column', 'inner').......

     df2 = df2.localCheckpoint()
     df_date += [df2]

df_total = df_total.unionByName(union_many(*df_date))

return df_total
于 2022-01-24T17:32:24.073 回答