问题标签 [spark-checkpoint]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - 数据框检查点示例 Pyspark
我阅读了有关检查点的信息,它看起来很适合我的需求,但我找不到如何使用它的好例子。
我的问题是:
我应该指定检查点目录吗?是否可以这样做:
df.checkpoint()
是否有任何我应该注意的可选参数?
是否有默认检查点目录,或者我必须指定一个作为默认目录?
当我检查点数据框并重用它时-它会自动从我们编写文件的目录中读取数据吗?
如果您可以与我分享在 pyspark 中使用检查点的示例并提供一些解释,那就太好了。谢谢!
apache-spark - Error while writing data to console in spark streaming-- checkpoint location issue
I am trying to develop one simple streaming application. My code is working fine in batch processing but when i am dealing with streaming data, I am facing issue while writing the output to console.
Below code i am using for console output:
query = hashtags_count
.writeStream
.format("console")
.option("checkpointLocation", "chk-point-dir")
.outputMode("complete")
.start()
Error message:
ERROR StreamMetadata: Error writing stream metadata StreamMetadata(7db81fcd-b040-47cb-a857-04c84af53a6b)
I have installed spark locally on windows.
Please help to fix this issue
python - PySpark - 读取检查点数据帧
我目前正在使用 pyspark 为机器学习应用程序执行一些数据清理。最后一个会话崩溃了,但我设置了一个检查点目录并检查了我的数据帧。
现在我有以下形式的检查点数据目录:
rdd-subfolders 中的文件似乎是十六进制文件。
我怎样才能阅读这个检查点,以便我可以继续我的数据准备而不是再次运行整个过程?
scala - 为什么 checkpoint() 比 persist() 快
我有一个使用 DataFrame 进行计算的代码。
我试图以不同的方式缓存表格。
我得到了让我思考的结果。
为什么 checkpoint(false) 比 persist() 更有效?毕竟,检查点需要时间来序列化对象并将它们写入磁盘。
PS 我在 GitHub 上的小项目:https ://github.com/MinorityMeaning/CacheCheckpoint
pyspark - 在 Databricks 上使用带有火花流 (pyspark) 的检查点的 OOM 和数据丢失问题
我在databricks上使用带有火花流的检查点遇到了许多问题。下面的代码导致我们的集群出现 OOM 错误。调查集群的内存使用情况,我们可以看到随着时间的推移内存在缓慢增加,这表明内存泄漏(OOM 前约 10 天,而批处理只持续几分钟)。删除检查点以便创建新的检查点后,内存泄漏消失了,表明错误源于检查点。在类似的流式传输作业中,我们还遇到了一些数据从未被处理过的问题(再次,在重新创建检查点后修复)。
免责声明:我不完全理解检查点的深入行为,因为在线文档是回避的。因此,我不确定我的配置是否良好。
以下是该问题的一个最小示例:
pyspark 3.0.1,python 3.7
集群的 json conf 具有以下元素:
代码:
PS:如果函数'for_each_batch'的内容或者过滤条件改变了,我应该重新创建检查点吗?
apache-spark - Apache Spark Structured Streaming - 不写入检查点位置
我有一个简单的 Apache Spark Structured Streaming python 代码,它从 Kafka 读取数据,并将消息写入控制台。
我已经设置了检查点位置,但是代码没有写入检查点..任何想法为什么?
这是代码:
spark-streaming - Spark Streaming - 检查点引用 Vaccumed 文件并因 Filenotfound 而失败
我有火花流作业,它开始因 FileNotFound 异常而失败
因为 1.Sparkstreaming 与 maxFilesPerTrigger 一起运行,这导致创建大量未处理的文件积压 2.来自积压的文件已被清理和压缩 3.当作业运行时,它会尝试从上次离开的位置恢复(检查点已尚未处理的文件列表{这些文件现在是 vacummed&compacted}) 4.job 由于上述第 3 点而因 FileNotFound 异常而失败
我如何克服这个问题并在它停止的地方恢复加载表单。
谢谢,拉胡尔
google-cloud-dataproc - dataproc 火花检查点最佳实践?我也应该设置检查点目录吗?
我正在运行一个非常长时间运行的批处理作业。它会产生很多OOM异常。为了最小化这个问题,添加了 checkpoints()
我应该在哪里设置检查点目录?该位置必须可供所有执行者访问。目前,我正在使用一个桶。根据日志文件,我可以看到我的代码已经通过了几个 checkpoint() 调用,但是存储桶是空的
sparkContext.setCheckpointDir("gs://myBucket/checkpointDir/")
根据 CPU 利用率和日志消息,看起来我的工作仍在运行并取得进展。知道检查点数据的火花在哪里吗?
亲切的问候
安迪
apache-spark - 如何减少火花流写入的检查点文件的数量
如果 Spark Streaming 作业涉及 shuffle 和有状态处理,那么每个微批次很容易生成大量小文件。我们应该在不影响延迟的情况下减少文件数量。
python - Spark structured streaming - reading from last read processed message after service restart
I am currently reading from a kafka topic, processing the messages and writing them to another topic. This processing and producing logic is inside the test_saprk function. A code sample can be found below:
The problem is when I restart the service while it's processing the messages. What I wanted was for spark to pick up from the last read/processed message. Instead it processes them all from the beginning, so I end up with duplicated messages (if at the time of the service restart I had Y messages processed, I end up with TotalMessages+Y).
Shouldn't the checkpoints prevent this? Any ideas on how to fix it? Thanks
UPDATE: I think i have realised that it's only committing offsets for each batch. Sometimes I have only one batch, so that's why it's duplicating the messages. But this doesn't really apply for my use case, as I want to avoid duplicated messages as much as possible, so using this batch logic is not the best. But using a foreach would do it force me to work row by row, not being able to make use of the foreachPartition method that I use inside of the foreachBatch. Any ideas?