1

我有一个非常大的 MySQL 表(数十亿行,有几十列)我想在 Cassandra 中转换为 ColumnFamily。我正在使用赫克托。

我首先这样创建我的架构:

    String clusterName = "Test Cluster";
    String host = "cassandra.lanhost.com:9160";
    String newKeyspaceName = "KeyspaceName";
    String newColumnFamilyName = "CFName";

    ThriftCluster cassandraCluster;
    CassandraHostConfigurator cassandraHostConfigurator;

    cassandraHostConfigurator = new CassandraHostConfigurator(host);
    cassandraCluster = new ThriftCluster(clusterName, cassandraHostConfigurator);

    BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
    columnFamilyDefinition.setKeyspaceName(newKeyspaceName);
    columnFamilyDefinition.setName(newColumnFamilyName);    
    columnFamilyDefinition.setDefaultValidationClass("UTF8Type");
    columnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
    columnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);

    BasicColumnDefinition columnDefinition = new BasicColumnDefinition();
    columnDefinition.setName(StringSerializer.get().toByteBuffer("id"));
    columnDefinition.setIndexType(ColumnIndexType.KEYS);
    columnDefinition.setValidationClass(ComparatorType.INTEGERTYPE.getClassName());
    columnDefinition.setIndexName("id_index");
    columnFamilyDefinition.addColumnDefinition(columnDefinition);

    columnDefinition = new BasicColumnDefinition();
    columnDefinition.setName(StringSerializer.get().toByteBuffer("status"));
    columnDefinition.setIndexType(ColumnIndexType.KEYS);
    columnDefinition.setValidationClass(ComparatorType.ASCIITYPE.getClassName());
    columnDefinition.setIndexName("status_index");
    columnFamilyDefinition.addColumnDefinition(columnDefinition);

        .......

    ColumnFamilyDefinition cfDef = new ThriftCfDef(columnFamilyDefinition);

    KeyspaceDefinition keyspaceDefinition = 
        HFactory.createKeyspaceDefinition(newKeyspaceName, "org.apache.cassandra.locator.SimpleStrategy", 1, Arrays.asList(cfDef));

    cassandraCluster.addKeyspace(keyspaceDefinition);

完成后,我加载存储在列表中的数据,因为我使用 namedParametersJdbcTemplate 获取 MySQL 数据,如下所示:

String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String KeyspaceName = "KeyspaceName";
String ColumnFamilyName = "CFName";
final StringSerializer serializer = StringSerializer.get();

public void insert(List<SqlParameterSource> dataToInsert) throws ExceptionParserInterrupted {

    Keyspace workingKeyspace = null;
    Cluster cassandraCluster = HFactory.getOrCreateCluster(clusterName, host);
    workingKeyspace = HFactory.createKeyspace(KeyspaceName, cassandraCluster);
    Mutator<String> mutator = HFactory.createMutator(workingKeyspace, serializer);

    ColumnFamilyTemplate<String, String> template = new ThriftColumnFamilyTemplate<String, String>(workingKeyspace, ColumnFamilyName, serializer, serializer);

    long t1 = System.currentTimeMillis();

    for (SqlParameterSource data : dataToInsert) {

        String keyId = "id" + (Integer) data.getValue("id");

    mutator.addInsertion(keyId, ColumnFamilyName, HFactory.createColumn("id", (Integer) data.getValue("id"), StringSerializer.get(), IntegerSerializer.get()));
    mutator.addInsertion(keyId,ColumnFamilyName, HFactory.createStringColumn("status", data.getValue("status").toString()));

          ...............

    }

    mutator.execute();

    System.out.println(t1 - System.currentTimeMillis());

我在大约 1 小时内插入 100 000 行,这真的很慢。我听说过多线程我的插入,但在这种特殊情况下,我不知道该怎么做。我应该使用 BatchMutate 吗?

4

2 回答 2

1

有一种替代方法可以实现这一目标。您可以尝试探索https://github.com/impetus-opensource/Kundera。你会喜欢的。

Kundera 是适用于 NoSQL 数据存储的符合 JPA 2.0 的对象数据存储映射库,目前支持 Cassandra、HBase、MongoDB 和所有关系数据存储(Kundera 内部对所有关系数据存储使用 Hibernate)。

在您的情况下,您可以使用现有对象以及 JPA 注释将它们存储在 Cassandra 中。由于 Kundera 支持多语言持久性,因此您还可以使用 MySQL + Cassandra 组合,您可以将 MySQL 用于大部分数据,将 Cassandra 用于事务数据。由于您只需要关心对象和 JPA 注释,因此您的工作会容易得多。

对于性能,您可以查看https://github.com/impetus-opensource/Kundera/wiki/Kundera-Performance

于 2011-12-16T11:51:18.903 回答
1

是的,您应该从多个线程运行插入代码。查看以下压力测试代码,了解如何使用 hector 有效地执行此操作: https ://github.com/zznate/cassandra-stress

插入性能问题的另一个来源可能是您在列族上应用的二级索引的数量(每个二级索引在“幕后”创建一个额外的列族)。

正确设计的数据模型不应该真的需要大量的二级索引。以下文章很好地概述了 Cassandra 中的数据建模: http ://www.datastax.com/docs/1.0/ddl/index

于 2011-12-16T18:22:13.423 回答