1

我一直在尝试学习如何使用 Apache Spark,并且在尝试对 Cassandra 列中的所有值求和时遇到问题(使用 datastax spark-cassandra-connector)。我尝试的一切都会导致java.lang.OutOfMemoryError: Java heap space

这是我提交给火花大师的代码:

object Benchmark {
  def main( args: Array[ String ] ) {
    val conf    = new SparkConf()
                  .setAppName( "app" )
                  .set( "spark.cassandra.connection.host", "ec2-blah.compute-1.amazonaws.com" )
                  .set( "spark.cassandra.auth.username", "myusername" )
                  .set( "spark.cassandra.auth.password", "mypassword" )
                  .set( "spark.executor.memory", "4g" )
    val sc      = new SparkContext( conf )
    val tbl     = sc.cassandraTable( "mykeyspace", "mytable" )
    val res     = tbl.map(_.getFloat("sclrdata")).sum()

    println( "sum = " + res )
  }
}

现在,我的集群中只有一个 Spark 工作节点,而且鉴于表的大小,绝对有可能并非所有这些节点都可以同时放入内存中。但是我不认为这会是一个问题,因为 spark 应该懒惰地评估命令,并且对列中的所有值求和不需要让整个表一次驻留在内存中。

我是这个主题的新手,所以任何关于为什么这不起作用的澄清或关于如何正确地做到这一点的帮助将不胜感激。

谢谢

4

1 回答 1

1

也许 spark 正在将整个表构建为单个内存分区,以便它可以对其进行映射操作。

我认为 spark 应该溢出到磁盘而不是抛出 OutOfMemoryExceptions,但如果只有一个分区,它可能无法溢出。我在这里看到了一个类似的问题,他通过指定这样的拆分大小来解决它:

conf = new SparkConf();
        conf.setAppName("Test");
        conf.setMaster("local[4]");
        conf.set("spark.cassandra.connection.host", "192.168.1.15").
        set("spark.executor.memory", "2g").
        set("spark.cassandra.input.split.size_in_mb", "67108864");

所以尝试在你的 conf 中设置 spark.cassandra.input.split.size_in_mb。

我想这将允许 spark 总结表的块,然后在它需要新块的空间时从内存中逐出这些块。

您可以研究的另一件事是为表 RDD 指定一个存储级别,以允许它溢出到磁盘。我认为您可以通过添加“.persist(StorageLevel.MEMORY_AND_DISK)”来做到这一点。默认值似乎是 MEMORY_ONLY。在 RDD 持久性部分中查看有关存储级别的更多信息

于 2015-07-10T13:12:08.017 回答