3

我们想要测试如果一个列具有 TTL(生存时间)属性,它最终将与包含它的空行一起从 cassandra 中完全删除。

据我了解,测试这种行为的算法是

  • 保存对象时,为列设置TTL
  • 等待TTL时间过去,检查返回值是否为null
  • 等待 GC_GRACE_SECONDS perion 通过
  • 检查该行是否也被删除

我没有检查最后一项。

正如我发现的那样(例如,这里这里以及其他地方),我需要运行压缩。已经提出了类似的问题(例如Hector (Cassandra) Delete Anomaly),但我没有找到任何帮助,谷歌搜索也没有太大帮助。

所以问题是,我如何从我的集成测试中强制压缩(使用 hector)以确保它的行为符合预期?还是有其他方法可以做到这一点?

PS 截断列族不是一种选择。


这是详细信息。

我的测试:

private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";

private static final int GC_CRACE_SECONDS = 5;

// sut
private CassandraService cassandraService;

// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr", 
    "localhost:9160");

private Keyspace keyspace;

@BeforeClass
public static void setupBeforeClass() {
    EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}

@Before
public void setUp() throws Exception {
    keyspace = createKeyspace(KEYSPACE, cluster, 
        new QuorumAllConsistencyLevelPolicy());
    cassandraService = new CassandraService(cluster, KEYSPACE, 
        COLUMN_FAMILY, GC_CRACE_SECONDS);
}

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
    Object obj = "OBJECT";
    String rowKey = "key";
    String columnName = "columnName";
    logger.info("before persisting rows count is {}" + countRows());

    cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);

    logger.info("after persisting rows count is {}" + countRows());

    Object value = retrieve(rowKey, columnName);
    assertNotNull(value);

    logger.info("before TTL passes rows count is {}" + countRows());

    TimeUnit.SECONDS.sleep(6);

    Object nullValue = retrieve(rowKey, columnName);
    assertNull(nullValue);

    logger.info("after TTL passes rows count is {}" + countRows());

    TimeUnit.SECONDS.sleep(10);

    logger.info("wait 10 more seconds... rows count is {}" + countRows());
    System.out.println("================================" + countRows());

    TimeUnit.SECONDS.sleep(120);

    int countRows = countRows();
    logger.info("wait 2 more minutes... rows count is {}" + countRows);
    assertEquals(0, countRows);
}

持久化代码:

public void persistObjectWithTtl(Object rowKey, Object columnName, 
        Object obj, int ttl) {
    LOGGER.debug("Persist {} / {}", rowKey, columnName);
    HColumn<Object, Object> column = createColumn(columnName, obj, 
            SERIALIZER, SERIALIZER);
    column.setTtl(ttl);
    executeInsertion(rowKey, column);
}

private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
    Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
    mutator.addInsertion(rowKey, this.columnFamilyName, column);
    mutator.execute();
}

为列族设置 GcGraceSeconds:

private void addColumnFamily(String keySpaceName, String columnFamilyName, 
            int gcGraceSeconds) {
    ColumnFamilyDefinition columnFamilyDefinition = 
        createColumnFamilyDefinition(keySpaceName, columnFamilyName);

    ThriftCfDef columnFamilyWithGCGraceSeconds = 
        new ThriftCfDef(columnFamilyDefinition);
    columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);

    cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}

以及在 SO 上找到的用于计算行数的代码:

public int countRows() {
    int rowCount = 100;

    ObjectSerializer serializer = ObjectSerializer.get();
    RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
            HFactory.createRangeSlicesQuery(keyspace, serializer, 
                serializer, serializer)
                    .setColumnFamily(COLUMN_FAMILY)
                    .setRange(null, null, false, 10)
                    .setRowCount(rowCount);

    Object lastKey = null;

    int i = 0;
    while (true) {
        rangeSlicesQuery.setKeys(lastKey, null);

        QueryResult<OrderedRows<Object, Object, Object>> result = 
            rangeSlicesQuery.execute();
        OrderedRows<Object, Object, Object> rows = result.get();
        Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();

        if (lastKey != null && rowsIterator != null) {
            rowsIterator.next();
        }

        while (rowsIterator.hasNext()) {
            Row<Object, Object, Object> row = rowsIterator.next();
            lastKey = row.getKey();
            i++;

            if (row.getColumnSlice().getColumns().isEmpty()) {
                continue;
            }
        }

        if (rows.getCount() < rowCount) {
            break;
        }

    }

    return i;
}

谢谢。


更新:

原因是数据量不足以运行压缩,所以我需要放入更多数据,并更频繁地将表刷新到磁盘。所以我最终得到了以下测试用例:

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
    final int expectedAmount = 50000;

    logger.info("before persisting rows count is {}", countRows());

    for (int i = 0; i < expectedAmount; i++) {
        String rowKey = RandomStringUtils.randomAlphanumeric(128);
        Object obj = RandomStringUtils.randomAlphanumeric(1000);
        cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);

        if (i % 100 == 0) {
            StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
        }
    }

    logger.info("causing major compaction...");
    StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
    logger.info("after major compaction rows count is {}", countRows());

    waitAtMost(Duration.TWO_MINUTES)
        .pollDelay(Duration.TWO_SECONDS)
        .pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
        .until(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                int countRows = countRows();
                logger.info("the rows count is {}", countRows);
                return countRows < expectedAmount;
            }
        });
}

完整代码:测试类和 sut

4

1 回答 1

1

由于您使用的是 Java,因此您可以使用MBean的forceTableCompaction(keyspace, columnFamily)方法轻松地通过 JMX 强制压缩。org.apache.cassandra.db.StorageService

于 2013-02-02T17:04:22.780 回答