1

我正在学习如何将 Storm 的 Trident 与 Cassandra 2.0.5、Storm 版本 0.9.0.1 一起使用。我也在使用 com.hmsonlinestorm-cassandra 0.4.0-rc4 contrib。

我的目标只是将一些文本行插入到具有 id (int)、name (text) 和 sentence (text) 列的表中。id 和 name 是主键。

partitionPersist需要一个,StateUpdater为此我正在使用com.hmsonline.storm.cassandra.trident.CassandraUpdater<K, C, V>. 但从看起来它只有一个键作为输入而不是两个(我需要 id 和 name)。元组映射器 ( TridentTupleMapper) 也使用一个键:

TridentTupleMapper<K, C, V> tupleMapper

也许我遗漏了一些东西,但是如何将多个列定义为键?

4

1 回答 1

2

让我向您指出 Brian 和我一直在研究的使用 cassandra 和 Storm 的项目:https ://github.com/hmsonline/storm-cassandra-cql

您可以查看几个示例来了解如何开发适合您的键/列映射的 CqlTupleMapper。代码仍在开发中,但有一个适用于 CQL3 的支持映射实现,可用于持久化聚合以及仅存储分区持久性。

根据您的需要,您需要定义一个 trident 拓扑,通过以下方法对传入的数据(句子)进行分组:

inputStream.groupBy(new Field("sentences"))

然后,您将实现一个 CqlTupleMapper——特别是 map(K key, V value),它有一个自定义 CQL insert 语句,将键映射到其传递的值。您的查询将类似于:

@Override
public Statement map(List<String> keys, String value) {
    Insert statement = QueryBuilder.insertInto(KEYSPACE_NAME, TABLE_NAME);
    statement.value("id", keys.get(0));
    statement.value("name", keys.get(1));
    statement.value("sentence", value);
    return statement;
}

我希望这会有所帮助。

于 2014-02-22T01:54:25.603 回答