我从cassandra取一些数据到spark,当数据足够大并且无法一次缓存在内存中时,我必须使用spark.cassandra.input.split.size_in_mb来设置机器一次可以获得多大的数据。但是我还想使用 缓存数据更多时间,代码如下:
val conf = new SparkConf().setAppName("CassandraLogAnalyse")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.auth.username", "xxx")
.set("spark.cassandra.auth.password", "xxx")
.set("spark.cassandra.input.split.size_in_mb",'512')
//Select Data from cassandra
val sc = new SparkContext(conf)
val loggly_http_in = sc.cassandraTable("loggly", "http_in").select("uid", "cjj_id", "request_uri", "request_body").where("app_context = ? and log_time > ?", "news", batch_time)
loggly_http_in.cache()
val rdd1 = loggly_http_in.map(...).filter(...)......
val rdd2 = loggly_http_in.map(...).filter(...)......
这是正确的吗?如果正确,它是如何工作的?当它出错时,正确的方法是什么?