我有一个非常大的 MySQL 表(数十亿行,有几十列)我想在 Cassandra 中转换为 ColumnFamily。我正在使用赫克托。
我首先这样创建我的架构:
String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String newKeyspaceName = "KeyspaceName";
String newColumnFamilyName = "CFName";
ThriftCluster cassandraCluster;
CassandraHostConfigurator cassandraHostConfigurator;
cassandraHostConfigurator = new CassandraHostConfigurator(host);
cassandraCluster = new ThriftCluster(clusterName, cassandraHostConfigurator);
BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
columnFamilyDefinition.setKeyspaceName(newKeyspaceName);
columnFamilyDefinition.setName(newColumnFamilyName);
columnFamilyDefinition.setDefaultValidationClass("UTF8Type");
columnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
columnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);
BasicColumnDefinition columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("id"));
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.INTEGERTYPE.getClassName());
columnDefinition.setIndexName("id_index");
columnFamilyDefinition.addColumnDefinition(columnDefinition);
columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("status"));
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.ASCIITYPE.getClassName());
columnDefinition.setIndexName("status_index");
columnFamilyDefinition.addColumnDefinition(columnDefinition);
.......
ColumnFamilyDefinition cfDef = new ThriftCfDef(columnFamilyDefinition);
KeyspaceDefinition keyspaceDefinition =
HFactory.createKeyspaceDefinition(newKeyspaceName, "org.apache.cassandra.locator.SimpleStrategy", 1, Arrays.asList(cfDef));
cassandraCluster.addKeyspace(keyspaceDefinition);
完成后,我加载存储在列表中的数据,因为我使用 namedParametersJdbcTemplate 获取 MySQL 数据,如下所示:
String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String KeyspaceName = "KeyspaceName";
String ColumnFamilyName = "CFName";
final StringSerializer serializer = StringSerializer.get();
public void insert(List<SqlParameterSource> dataToInsert) throws ExceptionParserInterrupted {
Keyspace workingKeyspace = null;
Cluster cassandraCluster = HFactory.getOrCreateCluster(clusterName, host);
workingKeyspace = HFactory.createKeyspace(KeyspaceName, cassandraCluster);
Mutator<String> mutator = HFactory.createMutator(workingKeyspace, serializer);
ColumnFamilyTemplate<String, String> template = new ThriftColumnFamilyTemplate<String, String>(workingKeyspace, ColumnFamilyName, serializer, serializer);
long t1 = System.currentTimeMillis();
for (SqlParameterSource data : dataToInsert) {
String keyId = "id" + (Integer) data.getValue("id");
mutator.addInsertion(keyId, ColumnFamilyName, HFactory.createColumn("id", (Integer) data.getValue("id"), StringSerializer.get(), IntegerSerializer.get()));
mutator.addInsertion(keyId,ColumnFamilyName, HFactory.createStringColumn("status", data.getValue("status").toString()));
...............
}
mutator.execute();
System.out.println(t1 - System.currentTimeMillis());
我在大约 1 小时内插入 100 000 行,这真的很慢。我听说过多线程我的插入,但在这种特殊情况下,我不知道该怎么做。我应该使用 BatchMutate 吗?