1

作为 apache spark 的新手,在 Spark 上获取 Cassandra 数据时遇到了一些问题。

List<String> dates = Arrays.asList("2015-01-21","2015-01-22");
CassandraJavaRDD<A> aRDD = CassandraJavaUtil.javaFunctions(sc).
                    cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)).
                    where("Id=? and date IN ?","Open",dates);

此查询未过滤 cassandra 服务器上的数据。虽然这个 java 语句正在执行它的内存并最终抛出 spark java.lang.OutOfMemoryError 异常。查询应该过滤掉 cassandra 服务器上的数据,而不是https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md中提到的客户端。

当我在 cassandra cqlsh 上使用过滤器执行查询时,它的性能很好,但是在没有过滤器(where 子句)的情况下执行查询会给出预期的超时。所以很明显spark没有在客户端应用过滤器。

SparkConf conf = new SparkConf();
            conf.setAppName("Test");
            conf.setMaster("local[8]");
            conf.set("spark.cassandra.connection.host", "192.168.1.15")

为什么在客户端应用过滤器以及如何改进在服务器端应用过滤器。

我们如何在 windows 平台上的 cassandra 集群之上配置 spark 集群?

4

3 回答 3

2

没有使用 Cassandra 和 Spark,通过阅读您提供的部分(谢谢)我看到:

注意:虽然 ALLOW FILTERING 子句隐式添加到生成的 CQL 查询中,但 Cassandra 引擎当前并不允许所有谓词。这个限制将在未来的 Cassandra 版本中得到解决。目前,ALLOW FILTERING 适用于由二级索引或集群列索引的列。

我很确定(但尚未测试)不支持“IN”谓词:请参阅https://github.com/datastax/spark-cassandra-connector/blob/24fbe6a10e083ddc3f770d1f52c07dfefeb7f59a/spark-cassandra-connector-java /src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80

因此,您可以尝试将 where 子句限制为 Id(假设有一个二级索引),并对日期范围使用 spark 过滤。

于 2015-06-30T17:57:04.093 回答
1

我建议将表格作为 DataFrame 而不是 RDD 读取。这些在 Spark 1.3 及更高版本中可用。然后您可以将 CQL 查询指定为这样的字符串:

CassandraSQLContext sqlContext = new CassandraSQLContext(sc);

String query = "SELECT * FROM testing.cf_text where id='Open' and date IN ('2015-01-21','2015-01-22')";
DataFrame resultsFrame = sqlContext.sql(query);

System.out.println(resultsFrame.count());

所以尝试一下,看看它是否更适合你。

在 DataFrame 中获得数据后,您可以在其上运行 Spark SQL 操作。如果您想要 RDD 中的数据,您可以将 DataFrame 转换为 RDD。

于 2015-07-01T00:03:01.553 回答
1

在 SparkConfing 中设置 spark.cassandra.input.split.size_in_mb 解决了这个问题。

conf = new SparkConf();
        conf.setAppName("Test");
        conf.setMaster("local[4]");
        conf.set("spark.cassandra.connection.host", "192.168.1.15").
        set("spark.executor.memory", "2g").
        set("spark.cassandra.input.split.size_in_mb", "67108864");

Spark-cassnadra-connector 读取 spark.cassandra.input.split.size_in_mb 的错误值,因此在 SparkConf 中覆盖该值即可。现在 IN 子句也运行良好。

于 2015-07-01T06:39:14.347 回答