我一直在为我的项目使用 Datastax 提供的 Apache Cassandra (v2.x)。我正在使用 Datasatx API 创建一个 ColumnFamily,如下所示:
//Create cluster
Cluster cluster = Cluster.builder().addContactPoint(hostNameOrIp)
//Get session
Session session = cluster.connect();
//create keyspace using session
session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': %d}",
QueryBuilder.quote("MY_KS"),
1)
);
String tableQuery = "CREATE TABLE timeline2 (
key varchar,
open float,
high float,
low float,
close float,
volume int,
adjusted float,
dtime timestamp,
PRIMARY KEY (key, dtime)
)";
//create columnFamily using session
ResultSet result = session.execute(tableQuery);
我现在被要求从 Datastax 提供的 Cassandra 转移到 Apache Cassandra (v2.x) 的普通风味,并使用 Hector API 做同样的事情。
但是我一直无法在 Hector 中找到类似的 API。我到目前为止所做的如下:
Map<String, String> accessMap = new HashMap<String, String>();
accessMap.put("username", username);
accessMap.put("password", password);
Cluster cluster = HFactory.getOrCreateCluster("TEST_CLUSTER", new CassandraHostConfigurator(cassandraUrl), accessMap);
ColumnFamilyDefinition cfDef = HFactory.createColumnFamilyDefinition("MY_KS", ComparatorType.BYTESTYPE);
KeyspaceDefinition newKeyspaceDef = HFactory.createKeyspaceDefinition("MY_KS", ThriftKsDef.DEF_STRATEGY_CLASS, 1, Arrays.asList(cfDef));
//Add the schema to the cluster.
//"true" as the second param means that Hector will block until all nodes see the change.
cassandraCluster.addKeyspace(newKeyspaceDef, true);
Keyspace ksp = HFactory.createKeyspace("MY_KS", cassandraCluster);
我现在被困在这一点上。我在 Hector 中找不到 API,在那里我可以像使用 Datastax API 一样为 CREATE TABLE 提供一个简单的查询字符串(即,通过提供简单的 CQL),我确实在互联网上探索了各种其他选项,但找不到一个简单的解决方案。我在 Hector WIKI 上看到的一个选项是使用 ColumnFamilyTemplate。我看到的其他选项是使用 BasicColumnDefinition。我的另一个选择是使用 Mutator.insert() 操作。
但是这些解决方案都不够清楚我将如何定义我的表列(又名列族)的“数据类型”。
此外,关于序列化器(StringSearlizer 等)和比较器到底是什么,没有足够清晰的指导/API 详细信息。
有人可以帮我解决这个问题吗?我的总目标是在 Hector 中搜索可以接受简单 CQL 查询并执行它们的 API(就像使用 Datastax API 一样)。
@亚历克斯波佩斯库
谢谢澄清,我现在明白了。
我现在修改了我的客户端,如下所示:
//This will give a connection to the cluster
Cluster cassandraCluster = connectApacheCassandra();
ColumnFamilyDefinition cfDef = HFactory.createColumnFamilyDefinition("TEST_KS", "TEST_CF",
ComparatorType.BYTESTYPE);
KeyspaceDefinition newKeyspaceDef = HFactory.createKeyspaceDefinition("TEST_KS", ThriftKsDef.DEF_STRATEGY_CLASS, 1, Arrays.asList(cfDef));
cassandraCluster.addKeyspace(newKeyspaceDef, true);
Keyspace ksp = HFactory.createKeyspace("TEST_KS", cassandraCluster);
BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition(cfDef);
cassandraCluster.addKeyspace(newKeyspaceDef, true);
Keyspace ksp = HFactory.createKeyspace("TEST_KS", cassandraCluster);
BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition(cfDef);
BasicColumnDefinition columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("aKey"));
columnDefinition.setIndexName("key_idx1");
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.LONGTYPE.getClassName());
columnFamilyDefinition.addColumnDefinition(columnDefinition);
columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("aTestColumn"));
columnDefinition.setValidationClass(ComparatorType.LONGTYPE.getClassName());
columnFamilyDefinition.addColumnDefinition(columnDefinition);
cassandraCluster.updateColumnFamily(new ThriftCfDef(columnFamilyDefinition));
我现在使用 cqlsh 查看查询 DESCRIBE COLUMNFAMILY "TEST_CF" 的输出,我得到以下输出:
CREATE TABLE "TEST_CF" (
key blob,
column1 blob,
"614b6579" bigint,
"6154657374436f6c756d6e" bigint,
value blob,
PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=1.000000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='NONE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};
我无法理解这个输出。我在此输出中看不到“aKey”和“aColumn”列。输出如何将列名显示为“key”、“column1”等(我从未在我的代码中提到它们)。此外,我无法理解此输出中显示的数据类型。
我的期望是输出如下:
CREATE TABLE TEST_CF (
aKey varchar,
aColumn varchar
PRIMARY KEY (aKey )
)";
您能否指出我在 Hector API 中犯了什么错误,以至于我没有得到预期的输出?另外,如果我希望列数据类型不是 varchar(比如 float);我应该在我的代码中做些什么改变?