2

我的程序中有一个长期运行的迭代,我想每隔几次迭代缓存和检查点(建议使用这种技术来减少网络上的长沿袭)所以我不会有 StackOverflowError,这样做

for (i <- 2 to 100) {
      //cache and checkpoint ever 30 iterations
      if (i % 30 == 0) {
        graph.cache
        graph.checkpoint
        //I use numEdges in order to start the transformation I need
        graph.numEdges
      }
      //graphs are stored to a list
      //here I use the graph of previous iteration to this iteration
      //and perform a transformation
}

我已经像这样设置了检查点目录

val sc = new SparkContext(conf)
sc.setCheckpointDir("checkpoints/")

但是,当我最终运行我的程序时,我得到了一个异常

Exception in thread "main" org.apache.spark.SparkException: Invalid checkpoint directory

我使用 3 台计算机,每台计算机都有 Ubuntu 14.04,并且我还在每台计算机上使用带有 hadoop 2.4 或更高版本的 spark 1.4.1 的预构建版本。

4

2 回答 2

4

如果您已经在节点集群上设置了 HDFS,您可以在目录中的“core-site.xml”中找到您的 hdfs 地址HADOOP_HOME/etc/hadoop。对我来说,core-site.xml 设置为:

<configuration>
      <property>
           <name>fs.default.name</name>
           <value>hdfs://master:9000</value>
      </property>
</configuration>

然后你可以在hdfs上创建一个目录来保存Rdd checkpoint文件,我们把这个目录命名为RddChekPoint,通过hadoop hdfs shell:

$ hadoop fs -mkdir /RddCheckPoint

如果使用 pyspark,SparkContext 初始化后sc = SparkContext(conf),可以通过设置 checkpoint 目录

sc.setCheckpointDir("hdfs://master:9000/RddCheckPoint")

当一个 Rdd 被检查点时,在 hdfs 目录 RddCheckPoint 中,你可以看到检查点文件保存在那里,看看:

$ hadoop fs -ls /RddCheckPoint
于 2016-10-24T15:41:01.440 回答
2

检查点目录需要是 HDFS 兼容目录(来自 scala 文档“HDFS 兼容目录,其中检查点数据将被可靠地存储。请注意,这必须是容错文件系统,如 HDFS”)。因此,如果您在这些节点上设置了 HDFS,请将其指向“hdfs://[yourcheckpointdirectory]”。

于 2015-09-05T09:07:19.950 回答