问题标签 [spark-checkpoint]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
893 浏览

scala - Spark中的迭代缓存与检查点

我有一个在 Spark 上运行的迭代应用程序,我将其简化为以下代码:

后续作业中的内存分配会发生什么变化?

  • 当前是否anRDD“覆盖”了以前的,还是都保存在内存中?从长远来看,这可能会引发一些内存异常
  • localCheckpointcache不同的行为吗?如果localCheckpoint用来代替cache,localCheckpoint截断 RDD 沿袭,那么我希望之前的 RDD 会被覆盖
0 投票
0 回答
1077 浏览

apache-spark - 如何在计划的 Spark 批处理作业中找出 Kafka 的起始偏移量和结束偏移量?

我正在尝试从我的 Spark 批处理作业中的 Kafka 主题中读取并发布到另一个主题。我没有使用流媒体,因为它不适合我们的用例。根据 spark docs,批处理作业默认从最早的 Kafka 偏移量开始读取,因此当我再次运行该作业时,它会再次从最早的偏移量读取。如何确保作业从上次读取的位置获取下一个偏移量?

根据 Spark Kafka 集成文档,可以选择指定“startingOffsets”和“endingOffsets”。但是我该如何弄清楚它们呢?

我正在使用spark.read.format("kafka")API 从 Kafka 读取数据作为数据集。但我没有找到任何选项来从此数据集读取的开始和结束偏移范围。

0 投票
2 回答
2338 浏览

apache-spark - 火花检查点和本地检查点有什么区别?

火花检查点和本地检查点有什么区别?在创建本地检查点时,我在 spark UI 中看到了这一点:

在此处输入图像描述

它表明本地检查点保存在内存中。

0 投票
1 回答
331 浏览

apache-spark - 启用检查点的 Spark 流式处理 SQS

我浏览过多个站点,例如 https://spark.apache.org/docs/latest/streaming-programming-guide.html

https://data-flair.training/blogs/spark-streaming-checkpoint/

https://docs.databricks.com/spark/latest/rdd-streaming/developing-streaming-applications.html

一些链接讨论了我们如何编码,但它太抽象了,我需要很多时间来弄清楚它是如何工作的

0 投票
0 回答
363 浏览

apache-spark - 运行超过 10 分钟后,Spark 流式检查点抛出错误

我正在 EMR 上使用 SQS 执行 Streaming 作业,但是在运行 10 分钟后,它开始在后台抛出错误(尽管应用程序仍在运行),导致日志中出现大量噪音。

以下是为应用程序提供的 spakr conf

0 投票
1 回答
1077 浏览

pyspark - pyspark 检查点在本地机器上失败

我刚刚开始在本地机器上使用独立学习 pyspark。我无法让检查站工作。我把脚本归结为这个......

我得到这个输出......

该错误没有详细说明失败的原因。我怀疑我错过了一些火花配置,但不确定是什么......

0 投票
1 回答
305 浏览

apache-spark - How to handle failure scenario in Spark write to orc file

I have a use case where I am pushing the data from Mongodb to HDFS in orc file which runs every 1 day interval and appends the data in orc file existing in hdfs.

Now my concern is if while writing to orc file , the job somehow gets failed or stopped. How should I handle that scenario taking in consideration that some data is already written in orc file. I want to avoid duplicate in orc file.

Snippet for writing to orc file format -

I don't want to go for checkpoint the complete RDD as it will be very expensive operation. Also, I don't want to create multiple orc file. Requirement is to maintain single file only.

Any other solution or approach I should try ?

0 投票
1 回答
612 浏览

apache-spark - 火花检查点:错误 java.io.FileNotFoundException

我有一个当前的管道,我在其中对我的数据框进行了几次转换。

插入检查点以确保可接受的执行时间很重要。

但是,有时我会从任何检查点收到此错误:

你能建议一下吗?

0 投票
1 回答
123 浏览

apache-kafka - Spark Structured Streaming-是否可以将偏移量写入两次

我正在使用 spark 结构化流来使用来自 kafka 主题的数据并将数据写入另一个 kafka 接收器。

我想存储偏移量两次 - 从主题中读取一次并搅拌偏移量。其次,当将数据写入输出接收器并写入偏移量时,这可以通过提供检查点目录位置来实现,

是否可以写订阅主题时消耗的偏移量。

0 投票
1 回答
171 浏览

apache-spark - Spark Structured Streaming 使用 spark-acid writeStream(带检查点)抛出 org.apache.hadoop.fs.FileAlreadyExistsException

在我们的 Spark 应用程序中,我们使用Spark structured streaming. 它使用Kafka as input stream, &HiveAcid as writeStream到 Hive 表。对于,它是从以下HiveAcid位置调用的开源库:https ://github.com/qubole/spark-acidspark acidqubole

下面是我们的代码:

我们能够将应用程序部署到生产环境,并重新部署了几次(~ 10 次)而没有问题。然后它遇到了以下错误:

查询 hiveSink [id = 080a9f25-23d2-4ec8-a8c0-1634398d6d29, runId = 990d3bba-0f7f-4bae-9f41-b43db6d1aeb3] 异常终止:作业因阶段失败而中止:阶段 0.0 中的任务 3 失败 4 次,最近一次失败:在 0.0 阶段丢失任务 3.3(TID 42、10.236.7.228、执行程序 3):org.apache.hadoop.fs.FileAlreadyExistsException:/warehouse/tablespace/managed/hive/events/year=2020/month=5/day=客户端 10.236.7.228 的 18/delta_0020079_0020079/bucket_00003 已经存在 (...) 在 com.qubole.shaded.orc.impl.PhysicalFsWriter.(PhysicalFsWriter.java:95) 在 com.qubole.shaded.orc.impl.WriterImpl。 (WriterImpl.java:177) 在 com.qubole.shaded.hadoop.hive.ql.io.orc.WriterImpl.(WriterImpl.java:94) 在 com.qubole.shaded.hadoop.hive.ql.io.orc。 OrcFile.createWriter(OrcFile.java:334) 在 com.qubole.shaded.hadoop.hive.ql.io.orc。OrcRecordUpdater.initWriter(OrcRecordUpdater.java:602) 在 com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSimpleEvent(OrcRecordUpdater.java:423) 在 com.qubole.shaded.hadoop.hive.ql。 io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:432) 在 com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:484) 在 com.qubole.spark.hiveacid。 writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:295) 在 com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$6.apply(TableWriter.scala:153) 在 com.qubole.spark。 hiveacid.writer.TableWriter$$anon$1$$anonfun$6.apply(TableWriter.scala:153) (...) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala: 153)在 com.qubole.spark.hiveacid.writer.TableWriter$$anon$1。应用(TableWriter.scala:139)

每次重新启动应用程序时,都会显示不同的delta + bucket files已存在错误。但是,这些文件每次启动时都是新创建的(很可能),但不知道为什么会抛出错误。

任何指针将不胜感激。