1

我正在尝试实现一个 Cassandra 触发器,这样当 keyspace1.tableA 上有更新或删除时,触发器将向 keyspace1.tableB 添加一行。

tableB 中的列名与 tableA 中的列名完全不同。

我正在使用 Cassandra 2.1,无法迁移到更新的版本。查看https://github.com/apache/cassandra/blob/cassandra-2.1/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java上的 InvertedIndex 触发器示例,我可以看到添加突变:

从 InvertedIndex 示例:

    for (Cell cell : update)
    {
        // Skip the row marker and other empty values, since they lead to an empty key.
        if (cell.value().remaining() > 0)
        {
            Mutation mutation = new Mutation(properties.getProperty("keyspace"), cell.value());
            mutation.add(properties.getProperty("columnfamily"), cell.name(), key, System.currentTimeMillis());
            mutations.add(mutation);
        }
    }

挑战在于,在此示例中,传递给 mutation.add 的单元名称是 cell.name() ,它是一个现有对象,我们可以使用该函数获取其名称。

现在,我只是想存储对 tableA 进行更改的时间,所以 tableB 有两列:

  • 更改时间timeuuid
  • 操作文本

我需要添加一个突变,它将在 tableB 中添加一行,并执行更改时间和操作。如何在 Cassandra 2.1.12 中添加这样的行突变?

我已经尝试过了,但在触发器中出现空指针异常:

...
String keycol = "changetime";
ByteBuffer uuidKey = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); 
ColumnIdentifier ci = new ColumnIdentifier(keycol, false);
CellName cn = CellNames.simpleSparse(ci);
mutation = new Mutation(keyspace, uuidKey);
mutation.add(tableName,cn, uuidKey, System.currentTimeMillis());
...

任何帮助将不胜感激 - 我不了解 Cassandra 的内部结构,所以没有多少细节是太多的信息。

4

1 回答 1

1

答案是使用 CFMetaData 比较器来创建 Mutation.add(...) 所需的 CellName。为了提供一个具体示例,我将使用来自https://github.com/apache/cassandra/tree/cassandra-3.0/examples/triggers的 Cassandra 3.0 AuditTrigger 中的架构和示例

在这种情况下,我们将写入的表是 test.audit 表,定义如下:

CREATE TABLE test.audit (key timeuuid, keyspace_name text,
    table_name text, primary_key text, PRIMARY KEY(key));

该表有一个名为“key”的分区键并且没有聚簇列。有关定义,请参见https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmt,“分区键和集群列”部分。

需要注意这一点,因为对 makeCellName 的调用(我们将在下面的示例代码中看到)采用一个可变参数列表,其中每个参数是我们希望相应聚类列对将受到影响的行采用的值, 最后一个参数是文本格式的列名。

当没有聚类列时(如此模式中的情况),对 makeCellName 的调用采用单个参数:列的名称。

将所有这些放在一起,Cassandra 2.1 的 AuditTrigger 函数与 3.0 示例执行相同的操作如下所示:

public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
    CFMetaData cfm = update.metadata();

    List<Mutation> mutations = new ArrayList<>(update.getColumnCount());

    String keyspaceName = "";
    String tableName    = "";
    String keyStr       = "";

    keyspaceName = cfm.ksName;
    tableName = cfm.cfName;

    try {
        keyStr = ByteBufferUtil.string(key);
    } catch (CharacterCodingException e) {
        StringWriter errors = new StringWriter();
        e.printStackTrace(new PrintWriter(errors));
        logger.error(errors.toString());
    }

    for (Cell cell : update)
    {
        // Skip the row marker and other empty values, since they lead to an empty key.
        if (cell.value().remaining() > 0)
        {
            CFMetaData other = Schema.instance.getCFMetaData("test","audit");
            CellNameType cnt = other.comparator;

            ByteBuffer auditkey = UUIDType.instance.decompose(UUIDGen.getTimeUUID());

            // create CellName objects for each of the columns in the audit table row we are inserting
            CellName primaryKeyCellName = cnt.makeCellName("primary_key");
            CellName keyspaceCellName = cnt.makeCellName("keyspace_name");
            CellName tableCellName = cnt.makeCellName("table_name");

            try {
                // put the values we want to write to the audit table into ByteBuffer objects
                ByteBuffer ksvalbb,tablevalbb,keyvalbb;
                ksvalbb=ByteBuffer.wrap(keyspaceName.getBytes("UTF8"));
                tablevalbb=ByteBuffer.wrap(tableName.getBytes("UTF8"));
                keyvalbb=ByteBuffer.wrap(keyStr.getBytes("UTF8"));

                // create the mutation object
                Mutation mutation = new Mutation(keyspaceName, auditkey);

                // get the time which will be needed for the call to mutation.add
                long mutationTime=System.currentTimeMillis();

                // add each of the column values to the mutation
                mutation.add("audit", primaryKeyCellName, keyvalbb,  mutationTime);
                mutation.add("audit", keyspaceCellName,  ksvalbb,  mutationTime);
                mutation.add("audit", tableCellName, tablevalbb,  mutationTime);

                mutations.add(mutation);
            } catch (UnsupportedEncodingException e) {
                StringWriter errors = new StringWriter();
                e.printStackTrace(new PrintWriter(errors));
                logger.error(errors.toString());
            }
        }
    }
    return mutations;
} 
于 2016-02-19T18:41:24.480 回答