1

我在databricks上使用带有火花流的检查点遇到了许多问题。下面的代码导致我们的集群出现 OOM 错误。调查集群的内存使用情况,我们可以看到随着时间的推移内存在缓慢增加,这表明内存泄漏(OOM 前约 10 天,而批处理只持续几分钟)。删除检查点以便创建新的检查点后,内存泄漏消失了,表明错误源于检查点。在类似的流式传输作业中,我们还遇到了一些数据从未被处理过的问题(再次,在重新创建检查点后修复)。

免责声明:我不完全理解检查点的深入行为,因为在线文档是回避的。因此,我不确定我的配置是否良好。

以下是该问题的一个最小示例:

pyspark 3.0.1,python 3.7

集群的 json conf 具有以下元素:

  "spark_conf": {
    "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
    "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true"
  }

代码:

import pandas as pd
from pyspark.sql import functions as F

def for_each_batch(data, epoch_id):
   pass

spark.readStream.format("delta").load("path/to/delta").filter(
F.col("TIME") > pd.Timestamp.utcnow() - pd.Timedelta(hours=1)
).writeStream.option(
"ignoreChanges", "true"
).option(
"checkpointLocation", "path/to/checkpoint"
).trigger(
processingTime="3 minutes"
).foreachBatch(
for_each_batch
).start()

PS:如果函数'for_each_batch'的内容或者过滤条件改变了,我应​​该重新创建检查点吗?

4

0 回答 0