8

想象以下情景:Spark 应用程序(Java 实现)正在使用 Cassandra 数据库来加载、转换为 RDD 并处理数据。此外,该应用程序正在从数据库中传输新数据,这些数据也由自定义接收器处理。流式处理的输出存储在数据库中。该实现使用与数据库集成的 Spring Data Cassandra。

卡桑德拉配置:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

DataProcessor.main 方法:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

预计初始加载会有大量数据。出于这个原因,数据在 rddBuffer 中被分页、加载和分发。

还有以下可用选项:

  1. Spark-Cassandra 示例(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala),尽管文档数量最少对于这个例子。
  2. Calliope 项目 ( http://tuplejump.github.io/calliope/ )

我想知道将 Spark 与 Cassandra 集成的最佳实践是什么。在我的实施中遵循的最佳选择是什么?

阿帕奇星火1.0.0,阿帕奇卡桑德拉2.0.8

4

2 回答 2

9

使用 Cassandra 和 Spark 最简单的方法是使用 DataStax 开发的 Spark 官方开源 Cassandra 驱动程序:https ://github.com/datastax/spark-cassandra-connector

该驱动程序构建在 Cassandra Java 驱动程序之上,并提供了 Cassandra 和 Spark 之间的直接桥梁。与 Calliope 不同,它不使用 Hadoop 接口。此外,它还提供以下独特功能:

  • 支持所有 Cassandra 数据类型,包括集合,开箱即用
  • Cassandra 行到自定义类或元组的轻量级映射,无需使用 Scala 中的任何隐式或其他高级功能
  • 将任何 RDD 保存到 Cassandra
  • 完全支持 Cassandra 虚拟节点
  • 在服务器端过滤/选择的能力,例如利用 Cassandra 集群列或二级索引
于 2014-06-30T19:36:23.953 回答
1

上面代码中的方法是一种经典的集中式算法,只有在一个节点中执行才能工作。Cassandra 和 Spark 都是分布式系统,因此有必要对流程进行建模,使其可以分布在多个节点之间。

可能的方法很少:如果您知道要获取的行的键,则可以执行以下简单操作:(使用 DataStax Java 驱动程序)

val data = sparkContext.parallelize(keys).map{key => 
   val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
   val session  = cluster.connect(keyspace)
   val statement = session.prepare("...cql...);")
   val boundStatement = new BoundStatement(sttmt)
   session.execute(session.execute(boundStatement.bind(...data...)
}

这将有效地在 Spark 集群中分配键的获取。请注意与 C* 的连接是如何在闭包内完成的,因为这样可以确保在每个单独的分布式工作人员上执行任务时建立连接。

鉴于您的示例使用通配符(即密钥未知),使用 Cassandra 的 Hadoop 接口是一个不错的选择。问题中链接的 Spark-Cassandra 示例说明了在 Cassandra 上使用此 Hadoop 接口。

Calliope 是一个库,它通过提供一个简单的 API 来访问该功能,从而封装了使用 Hadoop 接口的复杂性。它仅在 Scala 中可用,因为它使用特定的 Scala 功能(例如即将发布的版本中的隐式和宏) 使用 Calliope,您基本上可以声明如何将 RDD[type] 转换为行键和行值,并且 Calliope 负责配置hadoop 与作业的接口。我们发现 Calliope(和底层的 hadoop 接口)比使用驱动程序与 Cassandra 交互快 2-4 倍。

结论:我会放弃 Spring-Data 配置来访问 Cassandra,因为这会将您限制为单个节点。如果可能,考虑一个简单的并行访问,或者在 Scala 中使用 Calliope 进行探索。

于 2014-06-28T22:49:51.103 回答