作为我项目的一部分,我必须为一个非常大的 Cassandra 数据集创建一个 SQL 查询接口,因此我一直在研究使用 Spark 对 cassandra 列族执行 SQL 查询的不同方法,我想出了 3 种不同的方法
使用带有静态定义模式的 Spark SQLContext
// statically defined in the application public static class TableTuple implements Serializable { private int id; private String line; TableTuple (int i, String l) { id = i; line = l; } // getters and setters ... }
我将定义定义为:
SparkConf conf = new SparkConf(true) .set("spark.cassandra.connection.host", CASSANDRA_HOST) .setJars(jars); SparkContext sc = new SparkContext(HOST, APP_NAME, conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<CassandraRow> rowrdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY); JavaRDD<TableTuple> rdd = rowrdd.map(row -> new TableTuple(row.getInt(0), row.getString(1))); DataFrame dataFrame = sqlContext.createDataFrame(rdd, TableTuple.class); dataFrame.registerTempTable("lines"); DataFrame resultsFrame = sqlContext.sql("Select line from lines where id=1"); System.out.println(Arrays.asList(resultsFrame.collect()));
将 Spark SQLContext 与动态定义的模式一起使用
SparkConf conf = new SparkConf(true) .set("spark.cassandra.connection.host", CASSANDRA_HOST) .setJars(jars); SparkContext sc = new SparkContext(HOST, APP_NAME, conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<CassandraRow> cassandraRdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY); JavaRDD<Row> rdd = cassandraRdd.map(row -> RowFactory.create(row.getInt(0), row.getString(1))); List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("line", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); DataFrame dataFrame = sqlContext.createDataFrame(rdd, schema); dataFrame.registerTempTable("lines"); DataFrame resultDataFrame = sqlContext.sql("select line from lines where id = 1"); System.out.println(Arrays.asList(resultDataFrame.collect()));
使用来自 spark-cassandra-connector 的 CassandraSQLContext
SparkConf conf = new SparkConf(true) .set("spark.cassandra.connection.host", CASSANDRA_HOST) .setJars(jars); SparkContext sc = new SparkContext(HOST, APP_NAME, conf); CassandraSQLContext sqlContext = new CassandraSQLContext(sc); DataFrame resultsFrame = sqlContext.sql("Select line from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_COLUMN_FAMILY + " where id = 1"); System.out.println(Arrays.asList(resultsFrame.collect()));
我想知道一种方法相对于另一种方法的优缺点。此外,对于该CassandraSQLContext
方法,查询仅限于 CQL,还是与 Spark SQL 完全兼容。我还想要一个与我的特定用例有关的分析,我有一个 cassandra 列族,其中包含 62 列的约 1760 万个元组。对于查询这么大的数据库,哪种方法最合适?