1

我从cassandra取一些数据到spark,当数据足够大并且无法一次缓存在内存中时,我必须使用spark.cassandra.input.split.size_in_mb来设置机器一次可以获得多大的数据。但是我还想使用 缓存数据更多时间,代码如下:

val conf = new SparkConf().setAppName("CassandraLogAnalyse")
  .set("spark.cassandra.connection.host", "xxx")
  .set("spark.cassandra.auth.username", "xxx")
  .set("spark.cassandra.auth.password", "xxx")
  .set("spark.cassandra.input.split.size_in_mb",'512')
//Select Data from cassandra
val sc = new SparkContext(conf)

val loggly_http_in = sc.cassandraTable("loggly", "http_in").select("uid", "cjj_id", "request_uri", "request_body").where("app_context = ? and log_time > ?", "news", batch_time)

loggly_http_in.cache()

val rdd1 = loggly_http_in.map(...).filter(...)......
val rdd2 = loggly_http_in.map(...).filter(...)......

这是正确的吗?如果正确,它是如何工作的?当它出错时,正确的方法是什么?

4

1 回答 1

1

spark.cassandra.input.split.size_in_mb设置与缓存无关。这个设置决定了每个 Spark 分区的大小。如果您将其设置得太大,您可能会获得太少的任务,并且您的某些节点可能仍然未使用。如果您将其设置得太低,您将从任务调度中获得更多开销。

Spark 可以缓存一个 RDD 的多个分区(以及多个 RDD)。因此,当您调用 时cache(),这将尝试缓存尽可能多的 RDD 分区,因为它可以找到可用内存。如果您需要的缓存比实际缓存更多,唯一的方法是为您的应用程序分配更多的 Spark 集群内存。

你的使用cache看起来不错。

不要忘记您还可以缓存任何转换后的 RDD。例如,在过滤后缓存 RDD 可能比缓存从 Cassandra 获取的原始 RDD 需要更少的内存。

于 2015-12-07T13:02:52.887 回答