问题标签 [spark-checkpoint]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
939 浏览

apache-spark - 数据框检查点示例 Pyspark

我阅读了有关检查点的信息,它看起来很适合我的需求,但我找不到如何使用它的好例子。
我的问题是:

  1. 我应该指定检查点目录吗?是否可以这样做:

    df.checkpoint()

  2. 是否有任何我应该注意的可选参数?

  3. 是否有默认检查点目录,或者我必须指定一个作为默认目录?

  4. 当我检查点数据框并重用它时-它会自动从我们编写文件的目录中读取数据吗?

如果您可以与我分享在 pyspark 中使用检查点的示例并提供一些解释,那就太好了。谢谢!

0 投票
0 回答
62 浏览

apache-spark - Error while writing data to console in spark streaming-- checkpoint location issue

I am trying to develop one simple streaming application. My code is working fine in batch processing but when i am dealing with streaming data, I am facing issue while writing the output to console.

Below code i am using for console output:

query = hashtags_count
.writeStream
.format("console")
.option("checkpointLocation", "chk-point-dir")
.outputMode("complete")
.start()

Error message:

ERROR StreamMetadata: Error writing stream metadata StreamMetadata(7db81fcd-b040-47cb-a857-04c84af53a6b)

I have installed spark locally on windows.

Please help to fix this issue

0 投票
0 回答
29 浏览

python - PySpark - 读取检查点数据帧

我目前正在使用 pyspark 为机器学习应用程序执行一些数据清理。最后一个会话崩溃了,但我设置了一个检查点目录并检查了我的数据帧。

现在我有以下形式的检查点数据目录:

rdd-subfolders 中的文件似乎是十六进制文件。

我怎样才能阅读这个检查点,以便我可以继续我的数据准备而不是再次运行整个过程?

0 投票
1 回答
113 浏览

scala - 为什么 checkpoint() 比 persist() 快

我有一个使用 DataFrame 进行计算的代码。

我试图以不同的方式缓存表格。

我得到了让我思考的结果。

在此处输入图像描述

为什么 checkpoint(false) 比 persist() 更有效?毕竟,检查点需要时间来序列化对象并将它们写入磁盘。

PS 我在 GitHub 上的小项目:https ://github.com/MinorityMeaning/CacheCheckpoint

0 投票
0 回答
91 浏览

pyspark - 在 Databricks 上使用带有火花流 (pyspark) 的检查点的 OOM 和数据丢失问题

我在databricks上使用带有火花流的检查点遇到了许多问题。下面的代码导致我们的集群出现 OOM 错误。调查集群的内存使用情况,我们可以看到随着时间的推移内存在缓慢增加,这表明内存泄漏(OOM 前约 10 天,而批处理只持续几分钟)。删除检查点以便创建新的检查点后,内存泄漏消失了,表明错误源于检查点。在类似的流式传输作业中,我们还遇到了一些数据从未被处理过的问题(再次,在重新创建检查点后修复)。

免责声明:我不完全理解检查点的深入行为,因为在线文档是回避的。因此,我不确定我的配置是否良好。

以下是该问题的一个最小示例:

pyspark 3.0.1,python 3.7

集群的 json conf 具有以下元素:

代码:

PS:如果函数'for_each_batch'的内容或者过滤条件改变了,我应​​该重新创建检查点吗?

0 投票
1 回答
34 浏览

apache-spark - Apache Spark Structured Streaming - 不写入检查点位置

我有一个简单的 Apache Spark Structured Streaming python 代码,它从 Kafka 读取数据,并将消息写入控制台。

我已经设置了检查点位置,但是代码没有写入检查点..任何想法为什么?

这是代码:

0 投票
0 回答
8 浏览

spark-streaming - Spark Streaming - 检查点引用 Vaccumed 文件并因 Filenotfound 而失败

我有火花流作业,它开始因 FileNotFound 异常而失败

因为 1.Sparkstreaming 与 maxFilesPerTrigger 一起运行,这导致创建大量未处理的文件积压 2.来自积压的文件已被清理和压缩 3.当作业运行时,它会尝试从上次离开的位置恢复(检查点已尚未处理的文件列表{这些文件现在是 vacummed&compacted}) 4.job 由于上述第 3 点而因 FileNotFound 异常而失败

我如何克服这个问题并在它停止的地方恢复加载表单。

谢谢,拉胡尔

0 投票
1 回答
47 浏览

google-cloud-dataproc - dataproc 火花检查点最佳实践?我也应该设置检查点目录吗?

我正在运行一个非常长时间运行的批处理作业。它会产生很多OOM异常。为了最小化这个问题,添加了 checkpoints()

我应该在哪里设置检查点目录?该位置必须可供所有执行者访问。目前,我正在使用一个桶。根据日志文件,我可以看到我的代码已经通过了几个 checkpoint() 调用,但是存储桶是空的

sparkContext.setCheckpointDir("gs://myBucket/checkpointDir/")

根据 CPU 利用率和日志消息,看起来我的工作仍在运行并取得进展。知道检查点数据的火花在哪里吗?

亲切的问候

安迪

0 投票
1 回答
13 浏览

apache-spark - 如何减少火花流写入的检查点文件的数量

如果 Spark Streaming 作业涉及 shuffle 和有状态处理,那么每个微批次很容易生成大量小文件。我们应该在不影响延迟的情况下减少文件数量。

0 投票
0 回答
22 浏览

python - Spark structured streaming - reading from last read processed message after service restart

I am currently reading from a kafka topic, processing the messages and writing them to another topic. This processing and producing logic is inside the test_saprk function. A code sample can be found below:

The problem is when I restart the service while it's processing the messages. What I wanted was for spark to pick up from the last read/processed message. Instead it processes them all from the beginning, so I end up with duplicated messages (if at the time of the service restart I had Y messages processed, I end up with TotalMessages+Y).

Shouldn't the checkpoints prevent this? Any ideas on how to fix it? Thanks

UPDATE: I think i have realised that it's only committing offsets for each batch. Sometimes I have only one batch, so that's why it's duplicating the messages. But this doesn't really apply for my use case, as I want to avoid duplicated messages as much as possible, so using this batch logic is not the best. But using a foreach would do it force me to work row by row, not being able to make use of the foreachPartition method that I use inside of the foreachBatch. Any ideas?