问题标签 [spark-checkpoint]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
4689 浏览

scala - 如何在 Spark Scala 中读取检查点数据框

我正在尝试测试下面的程序以获取检查点并从检查点位置读取如果应用程序由于资源不可用等任何原因而失败。当我终止作业并再次重新触发它时,执行将从头开始。不知道实现这一目标还需要什么。谢谢 !!

下面是代码:

0 投票
1 回答
1153 浏览

apache-spark - 如何将 Spark Streaming 检查点位置存储到 S3 中?

我对获取 S3 parquet 数据并将 parquet 数据写入 S3 的 Spark Streaming 应用程序 (Spark v2.3.2) 感兴趣。应用程序的数据帧流使用groupByKey()flatMapGroupsWithState()使用GroupState.

是否可以将其配置为使用s3检查点位置?例如:

我确认以上是能够成功将数据写入s3DataDestination.

但是,写入 s3 检查点位置时会引发异常:

这需要自定义实现 S3StateStoreProvider吗?或者,检查点位置是否需要写入 HDFS?

0 投票
1 回答
956 浏览

apache-kafka - 如何清理 Spark 结构化流中积累的检查点文件?

我为 SparkContext 添加了检查点,并为长期 Spark 结构化流作业的 kafka 数据流编写查询。

spark作业运行稳定。但是,我注意到检查点文件在 HDFS 和 S3 中累积,没有自动清理。我看到存储空间不断被这些文件吃掉。是否有某种方法可以配置这些检查点文件的保留时间以使其自动删除?还是我需要运行一些 cron 作业来手动删除它们?如果我手动删除它们,是否会影响正在进行的 Spark 作业?谢谢!

0 投票
0 回答
81 浏览

apache-spark-sql - Databricks Azure 中的结构化流式处理引发异常 - java.lang.IllegalStateException:读取增量文件 dbfs 时出错:/raw_zone/1.delta

我们在 Databricks 环境中使用结构化流,每次我们运行这个程序时 - kAFKA - 结构化流(DBR6.6,Spark 2.4.5) - 写入 CosmosDB,在我们执行最终操作之前,我们都会遇到与下面相同的异常连接以将数据保存到 Cosmos DB。我们没有修改任何 spark 特定设置并利用默认 spark /DBR 配置。

0 投票
1 回答
143 浏览

apache-spark - spark如何计算给定窗口间隔的窗口开始时间?

考虑我有一个带有时间戳字段列的输入 df,并且将窗口持续时间(没有滑动间隔)设置为:

10 分钟

输入的时间(2019-02-28 22:33:02)
窗口形成为(2019-02-28 22:30:02) to (2019-02-28 22:40:02)

8 分钟

与形成的时间(2019-02-28 22:33:02)
窗口相同的输入是(2019-02-28 22:26:02) to (2019-02-28 22:34:02)

5分钟

与形成的时间(2019-02-28 22:33:02)
窗口相同的输入是(2019-02-28 22:30:02) to (2019-02-28 22:35:02)

14 分钟

输入的时间(2019-02-28 22:33:02)
窗口形成为(2019-02-28 22:32:02) to (2019-02-28 22:46:02)


所以,我的问题是:

spark如何计算给定输入 ts 的窗口的开始时间?

0 投票
1 回答
267 浏览

scala - Apache Spark中没有水印的滑动窗口?

考虑到我有一个简单的聚合,其中定义了一个没有任何水印的窗口。

这里就像我们的窗口一样30 minutes,滑动间隔为10 minutes

  • Q1。这是否意味着10分钟后,它会滑动?
  • Q2。如果是这样,那是不是有点类似于水印?
0 投票
1 回答
298 浏览

apache-spark - Azure blob 存储的 Spark 流检查点问题:TaskCompletionListener 中的错误为 null

我正在使用 spark 结构化流的检查点功能,并将检查点元数据存储为 azure blob。

但是我遇到了错误,从日志看来它正在删除临时文件并尝试再次访问它。

以下是详细日志和任务计划

日志

逻辑计划

Jar 版本 azure-storage - 8.4.0,hadoop-azure - 2.9.2

0 投票
1 回答
220 浏览

apache-spark - Spark 结构在 S3 上使用检查点流式传输过多线程

  • 火花 3.0.1
  • hadoop-aws 3.2.0

我有一个简单的 spark 流应用程序,它从 Kafka 主题读取消息,将它们聚合并写入 Elasticsearch。我正在使用检查点和 S3 存储桶来存储它们。

一段时间后,应用程序开始失败,但出现以下异常:

VisualVM 显示,线程数量从一开始就上升,直到达到最大值(~4.8K): image

其中大多数是:

  • s3a-transfer-unbounded-poolXXX-tXX
  • s3a-transfer-shared-poolXXX-tXX

据我了解,创建这些线程池的唯一地方是

org.apache.hadoop.fs.s3a.S3AFileSystem#initialize

Spark 每次都会创建新的文件系统

org.apache.spark.sql.execution.streaming.StreamMetadata#write

叫做。

为什么会这样?如何防止创建此线程?

0 投票
0 回答
29 浏览

dataframe - Spark 检查点导致连接问题

我有一段代码基本上执行以下操作:

此时,我收到以下错误:

已解决的属性 UL#28099 在运算符 !Project [ SEQ_ID#27907, TOOL_ID#27908, TIME_STAMP#27909, DATE#27910, RESULT#27911, UL#28099, cast(CASE WHEN isnull(LL#27913) THEN -Infinity ELSE LL#27913 END as double) AS LL#27246, UW#27914,LW#27915]。具有相同名称的属性出现在操作中:UL。请检查是否使用了正确的属性。;;\n加入 FullOuter\n

但是,当我删除checkpoint并像正常缓存的数据帧一样运行它时,它可以正常工作。如果我的数据集很小,这没问题,但我需要检查点,因为与可用的 EMR 资源相比,我的数据集非常大。

有没有人遇到过类似的问题?

0 投票
0 回答
19 浏览

python - 如何加载检查点 pyspark 数据框

我下面的代码崩溃了,而不是从头开始,我想从最后一个检查点数据帧开始。我怎样才能加载它?我的目录中有这个文件夹/tmp/53af5ba0-4419-4ab9-93c0-e5f69fd1c8eb