3

作为我项目的一部分,我必须为一个非常大的 Cassandra 数据集创建一个 SQL 查询接口,因此我一直在研究使用 Spark 对 cassandra 列族执行 SQL 查询的不同方法,我想出了 3 种不同的方法

  1. 使用带有静态定义模式的 Spark SQLContext

    // statically defined in the application
    public static class TableTuple implements Serializable {
        private int id;
        private String line;
    
        TableTuple (int i, String l) {
            id = i;
            line = l;
        }
    
        // getters and setters
        ...
    }
    

    我将定义定义为:

    SparkConf conf = new SparkConf(true)
            .set("spark.cassandra.connection.host", CASSANDRA_HOST)
            .setJars(jars);
    
    SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
    SQLContext sqlContext = new SQLContext(sc);
    
    JavaRDD<CassandraRow> rowrdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
    JavaRDD<TableTuple> rdd = rowrdd.map(row -> new TableTuple(row.getInt(0), row.getString(1)));
    
    DataFrame dataFrame = sqlContext.createDataFrame(rdd, TableTuple.class);
    dataFrame.registerTempTable("lines");
    
    DataFrame resultsFrame = sqlContext.sql("Select line from lines where id=1");
    
    System.out.println(Arrays.asList(resultsFrame.collect()));
    
  2. 将 Spark SQLContext 与动态定义的模式一起使用

    SparkConf conf = new SparkConf(true)
            .set("spark.cassandra.connection.host", CASSANDRA_HOST)
            .setJars(jars);
    
    SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
    SQLContext sqlContext = new SQLContext(sc);
    
    JavaRDD<CassandraRow> cassandraRdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
    JavaRDD<Row> rdd = cassandraRdd.map(row -> RowFactory.create(row.getInt(0), row.getString(1)));
    
    List<StructField> fields = new ArrayList<>();
    fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
    fields.add(DataTypes.createStructField("line", DataTypes.StringType, true));
    StructType schema = DataTypes.createStructType(fields);
    
    DataFrame dataFrame = sqlContext.createDataFrame(rdd, schema);
    dataFrame.registerTempTable("lines");
    
    DataFrame resultDataFrame = sqlContext.sql("select line from lines where id = 1");
    
    System.out.println(Arrays.asList(resultDataFrame.collect()));
    
  3. 使用来自 spark-cassandra-connector 的 CassandraSQLContext

    SparkConf conf = new SparkConf(true)
            .set("spark.cassandra.connection.host", CASSANDRA_HOST)
            .setJars(jars);
    
    SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
    
    CassandraSQLContext sqlContext = new CassandraSQLContext(sc);
    DataFrame resultsFrame = sqlContext.sql("Select line from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_COLUMN_FAMILY + " where id = 1");
    
    System.out.println(Arrays.asList(resultsFrame.collect()));
    

我想知道一种方法相对于另一种方法的优缺点。此外,对于该CassandraSQLContext方法,查询仅限于 CQL,还是与 Spark SQL 完全兼容。我还想要一个与我的特定用例有关的分析,我有一个 cassandra 列族,其中包含 62 列的约 1760 万个元组。对于查询这么大的数据库,哪种方法最合适?

4

0 回答 0