我们想要测试如果一个列具有 TTL(生存时间)属性,它最终将与包含它的空行一起从 cassandra 中完全删除。
据我了解,测试这种行为的算法是
- 保存对象时,为列设置TTL
- 等待TTL时间过去,检查返回值是否为null
- 等待 GC_GRACE_SECONDS perion 通过
- 检查该行是否也被删除
我没有检查最后一项。
正如我发现的那样(例如,这里或这里以及其他地方),我需要运行压缩。已经提出了类似的问题(例如Hector (Cassandra) Delete Anomaly),但我没有找到任何帮助,谷歌搜索也没有太大帮助。
所以问题是,我如何从我的集成测试中强制压缩(使用 hector)以确保它的行为符合预期?还是有其他方法可以做到这一点?
PS 截断列族不是一种选择。
这是详细信息。
我的测试:
private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";
private static final int GC_CRACE_SECONDS = 5;
// sut
private CassandraService cassandraService;
// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr",
"localhost:9160");
private Keyspace keyspace;
@BeforeClass
public static void setupBeforeClass() {
EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}
@Before
public void setUp() throws Exception {
keyspace = createKeyspace(KEYSPACE, cluster,
new QuorumAllConsistencyLevelPolicy());
cassandraService = new CassandraService(cluster, KEYSPACE,
COLUMN_FAMILY, GC_CRACE_SECONDS);
}
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
Object obj = "OBJECT";
String rowKey = "key";
String columnName = "columnName";
logger.info("before persisting rows count is {}" + countRows());
cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);
logger.info("after persisting rows count is {}" + countRows());
Object value = retrieve(rowKey, columnName);
assertNotNull(value);
logger.info("before TTL passes rows count is {}" + countRows());
TimeUnit.SECONDS.sleep(6);
Object nullValue = retrieve(rowKey, columnName);
assertNull(nullValue);
logger.info("after TTL passes rows count is {}" + countRows());
TimeUnit.SECONDS.sleep(10);
logger.info("wait 10 more seconds... rows count is {}" + countRows());
System.out.println("================================" + countRows());
TimeUnit.SECONDS.sleep(120);
int countRows = countRows();
logger.info("wait 2 more minutes... rows count is {}" + countRows);
assertEquals(0, countRows);
}
持久化代码:
public void persistObjectWithTtl(Object rowKey, Object columnName,
Object obj, int ttl) {
LOGGER.debug("Persist {} / {}", rowKey, columnName);
HColumn<Object, Object> column = createColumn(columnName, obj,
SERIALIZER, SERIALIZER);
column.setTtl(ttl);
executeInsertion(rowKey, column);
}
private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
mutator.addInsertion(rowKey, this.columnFamilyName, column);
mutator.execute();
}
为列族设置 GcGraceSeconds:
private void addColumnFamily(String keySpaceName, String columnFamilyName,
int gcGraceSeconds) {
ColumnFamilyDefinition columnFamilyDefinition =
createColumnFamilyDefinition(keySpaceName, columnFamilyName);
ThriftCfDef columnFamilyWithGCGraceSeconds =
new ThriftCfDef(columnFamilyDefinition);
columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);
cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}
以及在 SO 上找到的用于计算行数的代码:
public int countRows() {
int rowCount = 100;
ObjectSerializer serializer = ObjectSerializer.get();
RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(keyspace, serializer,
serializer, serializer)
.setColumnFamily(COLUMN_FAMILY)
.setRange(null, null, false, 10)
.setRowCount(rowCount);
Object lastKey = null;
int i = 0;
while (true) {
rangeSlicesQuery.setKeys(lastKey, null);
QueryResult<OrderedRows<Object, Object, Object>> result =
rangeSlicesQuery.execute();
OrderedRows<Object, Object, Object> rows = result.get();
Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();
if (lastKey != null && rowsIterator != null) {
rowsIterator.next();
}
while (rowsIterator.hasNext()) {
Row<Object, Object, Object> row = rowsIterator.next();
lastKey = row.getKey();
i++;
if (row.getColumnSlice().getColumns().isEmpty()) {
continue;
}
}
if (rows.getCount() < rowCount) {
break;
}
}
return i;
}
谢谢。
更新:
原因是数据量不足以运行压缩,所以我需要放入更多数据,并更频繁地将表刷新到磁盘。所以我最终得到了以下测试用例:
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
final int expectedAmount = 50000;
logger.info("before persisting rows count is {}", countRows());
for (int i = 0; i < expectedAmount; i++) {
String rowKey = RandomStringUtils.randomAlphanumeric(128);
Object obj = RandomStringUtils.randomAlphanumeric(1000);
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);
if (i % 100 == 0) {
StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
}
}
logger.info("causing major compaction...");
StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
logger.info("after major compaction rows count is {}", countRows());
waitAtMost(Duration.TWO_MINUTES)
.pollDelay(Duration.TWO_SECONDS)
.pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int countRows = countRows();
logger.info("the rows count is {}", countRows);
return countRows < expectedAmount;
}
});
}
完整代码:测试类和 sut