问题标签 [spark-checkpoint]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
3327 浏览

apache-spark - Spark Structure Streaming 失败,因为找不到检查点文件

我在测试环境中运行 spark 结构化流。有时会发生由于找不到某些检查点文件而导致作业失败的情况。

一个原因可能是 kafka 主题的保留时间非常短。但我已经添加.option("failOnDataLoss", "false")到 SparkSession。

我对火花检查点有一些基本(非常基本)的了解。我想如果我删除了检查点目录,它应该可以恢复。但是正如我测试的那样,一旦发生此错误,删除目录就无济于事。我需要使用不同的检查点目录来修复它。

为什么删除检查点目录不起作用?或者有没有办法/选项可以帮助避免这个错误?

0 投票
0 回答
78 浏览

apache-spark - Spark 驱动程序中使用的检查点变量

我正在从 Kafka 流式传输数据,并且还在我的应用程序中维护状态(通过使用 updateStateByKey),所以我强制需要检查我的数据。这运作良好。

除了来自 kafka 的数据之外,我还使用一些局部变量来保存总记录等信息,以及一些特定于我的应用程序的信息,这些信息会根据我们从 Kafka 收到的数据而变化。

所以我也想保留这些信息,以便在驱动程序失败的情况下恢复。有没有办法检查这些额外的数据?

0 投票
1 回答
156 浏览

hdfs - 检查点流数据到 HDFS 集群

我有一个 HDFS 集群,它有两个 NameNode。通常,如果使用 HDFS 客户端来保存数据,它会在其中一个出现故障时负责使用哪个 NameNode。

但是在 Spark 中,对于检查点,API 是:StreamingCONtext.checkpoint("hdfs://100.90.100.11:9000/sparkData")。

这里我只能指定一个NameNode,如果出现故障,Spark 没有智能切换到第二个。

有人能帮我一下吗?

有没有办法,如果我将此 XML 放在类路径中,Spark 可以理解“hdfs-site.xml”(其中包含两个名称节点的信息)。

0 投票
1 回答
581 浏览

apache-spark - hdfs URI 未解析

在我当前的 spark 应用程序中,我正在检查 hdfs,hdfs URI 如下所示

我收到一个错误 org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/tmp/

我观察到///已解决/

这是一个错误还是我缺少任何配置。谢谢

0 投票
1 回答
1089 浏览

apache-spark - 执行器失败后 Spark 无法在 HDFS 中找到检查点数据

我正在从 Kafka 发送数据,如下所示:

我有四个工作线程和这个应用程序的多个执行程序,我正在尝试检查 Spark 的容错性。

由于我们使用的是 mapWithState,spark 正在检查点数据到 HDFS,所以如果任何 executor/worker 出现故障,我们应该能够恢复丢失的数据(数据丢失在死 executor 中),并继续使用剩余的 executor/workers。

所以我杀死了一个工作节点以查看应用程序是否仍然运行顺利,但是我在 HDFS 中得到了 FileNotFound 的异常,如下所示:

这有点奇怪,因为 Spark 有时在 HDFS 中检查点数据,为什么它无法找到它。显然 HDFS 没有删除任何数据,所以为什么会出现这个异常。

或者我在这里错过了什么?

进一步更新:我发现 Spark 试图在 HDFS 中查找的 RDD 已被“ReliableRDDCheckpointData”进程删除,它为检查点数据创建了一个新的 RDD。DAG 以某种方式指向这个旧的 RDD。如果对此数据有任何引用,则不应将其删除。

0 投票
1 回答
4325 浏览

hive - 从 hive 表中读取并更新 pyspark 中的同一张表 - 使用检查点

我正在使用 spark 2.3 版并尝试将 spark 中的配置单元表读取为:

在这里,我将一个具有当前日期的新列从系统添加到现有数据框

现在面临一个问题,当我尝试将此数据框写入配置单元表时

所以我正在检查数据帧以打破血统,因为我正在从同一个数据帧读取和写入

这样它就可以正常工作,并且新列已添加到配置单元表中。但是每次创建检查点文件时我都必须删除它。有没有最好的方法来打破血统并使用更新的列详细信息写入相同的数据帧并将其保存到 hdfs 位置或作为配置单元表。

或者有没有办法为检查点目录指定一个临时位置,在火花会话完成后将被删除。

0 投票
0 回答
344 浏览

apache-spark - Spark 检查点:内容、恢复和幂等性

我正在尝试了解检查点的内容和相应的恢复;理解检查点的过程显然是自然的方式,所以我浏览了以下列表:

我仍然在努力理解检查点末尾的磁盘上发生了什么。

我对 Spark 检查点的理解:

如果你有很长的 DAG 并且你的 spark 集群失败了,检查点通过将中间状态持久化到 HDFS 来帮助。因此,在检查点的帮助下,可以将 50 个转换的 DAG 减少到 4-5 个转换。它虽然打破了 DAG。

流式传输中的检查点

我的 Spark Streaming 作业有一个 5 秒的微批处理。据我了解,JobScheduler每 5 秒提交一个新作业,它调用JobGenerator从DStreamGraph为新的微批次生成RDD DAG ,同时接收器继续为下一个新的微批次收集数据循环。据我了解,如果我启用检查点,它将定期保持检查点“当前状态”。

问题:

  1. 这个“状态”是什么?这是仅针对当前微批次的基本 RDD 和 DAG 的运算符/转换状态的组合吗?所以我有以下内容:

    由于T=12的检查点,磁盘上的具体内容是什么?它会只存储ubatch 2的 DAG 运算符的当前状态吗?

    一个。如果是,则在T=100恢复期间,最后一个可用检查点位于T=12。已成功处理的T=15的ubatch 3会发生什么情况。应用程序是否在此处重新处理ubatch 3并处理幂等性?如果是,我们是否会转到流媒体源(例如 Kafka)并倒回偏移量以便能够重播从ubatch 3开始的内容?

    湾。如果不是,那么在 T=12 时检查点目录中究竟有什么内容?

0 投票
1 回答
248 浏览

python - 使用检查点数据框覆盖表失败并出现 FileNotFoundException

df在 pySpark 中有一些数据框,它来自于调用:

我想org_table在我的脚本末尾覆盖。由于禁止覆盖输入表,我检查了我的数据:

血统现在应该被打破,我也可以用checkpointed.show()(作品)查看我的检查点数据。什么不起作用是写表:

这会导致错误:

引起:java.io.FileNotFoundException:文件不存在:hdfs://org_table_path/org_table/part-00081-4e9d12ea-be6a-4a01-8bcf-1e73658a54dd-c000.snappy.parquet

我已经尝试了几件事,比如在写作之前刷新 org_table 等,但我在这里感到困惑。我该如何解决这个错误?

0 投票
0 回答
176 浏览

amazon-emr - 如何使用 EMRFS 通过结构化流进行检查点?

我一直在使用 S3 通过结构化流进行检查点。但是,我得到了与 S3 中的最终一致性相关的 FileNotFound 异常。

以下是我目前拥有的 S3 检查点。

我计划在 EMR 中运行我的 spark 作业时切换到 EMRFS。

EMRFS 的可靠性如何以及如何使用 EMRFS 进行检查点?

我们实施检查点的方式会发生变化吗?

如何在 EMR 中启用 EMRFS?

0 投票
1 回答
699 浏览

scala - 检查点/持久化/改组似乎不会“短路”rdd的血统,如“学习火花”一书中详述

在学习 Spark 时,我阅读了以下内容:

除了流水线之外,如果现有的 RDD 已经保存在集群内存或磁盘上,Spark 的内部调度程序可能会截断 RDD 图的沿袭。在这种情况下,Spark 可以“短路”并根据持久化的 RDD 开始计算。可能发生这种截断的第二种情况是,当 RDD 已经作为早期 shuffle 的副作用实现时,即使它没有显式地 persist()ed。这是一个底层优化,它利用了 Spark shuffle 输出被写入磁盘的事实,并利用了 RDD 图的许多部分被重新计算的事实。

所以,我决定尝试用一个简单的程序(如下)来看看这个:

阅读 Spark 书中的上述段落后,我没有看到我的预期。每次调用此方法时,我都看到了完全相同的 toDebugString 输出——每次都指示两个阶段(我原本预计在检查点应该截断沿袭之后只有一个阶段。),如下所示:

我想知道我忽略的关键问题是否可能是“可能”这个词,如“时间表可能会截断血统”。在其他情况下,考虑到我上面编写的相同程序,这种截断是否可能发生?还是我写的小程序没有做正确的事情来强制截断血统?提前感谢您提供的任何见解!