4

我正在开发一个需要在 Cassandra 数据库中存储 Avro Schema 的系统。所以在 Cassandra 我们将存储这样的东西

SchemaId            AvroSchema

1                   some schema
2                   another schema

现在假设我在 Cassandra 的上表中插入另一行,现在表是这样的 -

SchemaId            AvroSchema

1                   some schema
2                   another schema
3                   another new schema

一旦我在上表中插入新行 - 我需要告诉我的 Java 程序去提取新的模式 id 和相应的模式..

解决这类问题的正确方法是什么?

我知道,一种方法是每隔几分钟进行一次轮询,假设每 5 分钟我们会从上表中提取数据,但这不是解决此问题的正确方法,因为每 5 分钟一次,我正在拉是否有任何新模式..

但除此之外还有其他解决方案吗?

我们可以使用 Apache Zookeeper 吗?还是 Zookeeper 不适合这个问题?或者任何其他解决方案?

我正在运行 Apache Cassandra 1.2.9

4

2 回答 2

1

一些解决方案:

  • 使用数据库触发器:Cassandra 2.0 有一些触发器支持,但它看起来不是最终版本,根据这篇文章可能会在 2.1 中有所改变: http ://www.datastax.com/dev/blog/whats-new-in- cassandra-2-0-prototype-triggers-support。触发器是一种常见的解决方案。
  • 您提出了民意调查,但这并不总是一个糟糕的选择。特别是如果您有一些东西将该行标记为尚未被拉出,那么您可以将新行从 Cassandra 中拉出。如果查询成本不高,那么每 5 分钟拉一次对于 Cassandra 或任何数据库来说都不是明智之举。如果很少插入新行,则此选项可能不好。

Zookeeper 不会是一个完美的解决方案,请参阅此引用:

由于 watch 是一次性触发器,并且在获取事件和发送新请求以获取 watch 之间存在延迟,因此您无法可靠地看到 ZooKeeper 中节点发生的每一次更改。准备好处理 znode 在获取事件和再次设置手表之间多次更改的情况。(你可能不在乎,但至少意识到它可能会发生。)

引用来自:http: //zookeeper.apache.org/doc/r3.4.2/zookeeperProgrammers.html#sc_WatchRememberThese

于 2013-10-24T18:04:10.843 回答
1

卡桑德拉 3.0

您可以使用它,它会将插入中的所有内容作为 json 对象获取。

public class HelloWorld implements ITrigger
{
    private static final Logger logger = LoggerFactory.getLogger(HelloWorld.class);

    public Collection<Mutation> augment(Partition partition)
    {
        String tableName = partition.metadata().cfName;
        logger.info("Table: " + tableName);

        JSONObject obj = new JSONObject();
        obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));

        try {
            UnfilteredRowIterator it = partition.unfilteredIterator();
            while (it.hasNext()) {
                Unfiltered un = it.next();
                Clustering clt = (Clustering) un.clustering();  
                Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
                Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();

                while(columns.hasNext()){
                    ColumnDefinition columnDef = columns.next();
                    Cell cell = cells.next();
                    String data = new String(cell.value().array()); // If cell type is text
                    obj.put(columnDef.toString(), data);
                }
            }
        } catch (Exception e) {

        }
        logger.debug(obj.toString());

        return Collections.emptyList();
    }
}
于 2016-09-14T17:44:40.523 回答