14

首先:我知道在 Cassandra 中进行全面扫描不是一个好主意,但是,目前,这就是我所需要的。

当我开始寻找这样的事情时,我读到人们说不可能在 Cassandra 中进行全面扫描,而且他不是被迫做这种事情的。

不满意,我一直在寻找,直到找到这篇文章: http ://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/

看起来很合理,我试了一下。由于我将只进行一次完整扫描并且时间和性能不是问题,因此我编写了查询并将其放入一个简单的作业中以查找我想要的所有记录。从 20 亿行记录中,我的预期输出是 1000 条左右,但是,我只有 100 条记录。

我的工作:

public void run() {
    Cluster cluster = getConnection();
    Session session = cluster.connect("db");

    LOGGER.info("Starting ...");

    boolean run = true;
    int print = 0;

    while ( run ) {
        if (maxTokenReached(actualToken)) {
            LOGGER.info("Max Token Reached!");
            break;
        }
        ResultSet resultSet = session.execute(queryBuilder(actualToken));

        Iterator<Row> rows = resultSet.iterator();
        if ( !rows.hasNext()){
            break;
        }

        List<String> rowIds = new ArrayList<String>();

        while (rows.hasNext()) {
            Row row = rows.next();

            Long leadTime = row.getLong("my_column");
            if (myCondition(myCollumn)) {
                String rowId = row.getString("key");
                rowIds.add(rowId);
            }

            if (!rows.hasNext()) {
                Long token = row.getLong("token(rowid)");
                if (!rowIds.isEmpty()) {
                    LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                }
                actualToken = nextToken(token);
            }

        }

    }
    LOGGER.info("Done!");
    cluster.shutdown();
}

public boolean maxTokenReached(Long actualToken){
    return actualToken >= maxToken;
}

public String queryBuilder(Long nextRange) {
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}

public Long nextToken(Long token){
    return token + 1;
}

基本上我所做的是搜索允许的最小令牌并逐步进行直到最后一个。

我不知道,但就像这项工作没有完全完成全扫描,或者我的查询只访问了一个节点或其他东西。我不知道我是否做错了什么,或者真的不可能进行全面扫描。

今天我有将近 2 TB 的数据,在一个由七个节点组成的集群中只有一个表。

有人已经遇到过这种情况或有什么建议吗?

4

5 回答 5

8

在 Cassandra 中进行全表扫描绝对是可能的——事实上,这对于 Spark 之类的东西来说很常见。但是,它通常不是“快速”的,因此除非您知道自己为什么这样做,否则不鼓励这样做。对于您的实际问题:

1)如果你使用 CQL,你几乎可以肯定使用 Murmur3 分区器,所以你的最小令牌是 -9223372036854775808(最大令牌是 9223372036854775808)。

2) 您正在使用 session.execute(),它将使用默认一致性 ONE,它可能不会返回集群中的所有结果,特别是如果您也在 ONE 上编写,我怀疑您可能是。将其提高到 ALL,并使用准备好的语句来加速 CQL 解析:

 public void run() {
     Cluster cluster = getConnection();
     Session session = cluster.connect("db");
     LOGGER.info("Starting ...");
     actualToken = -9223372036854775808;
     boolean run = true;
     int print = 0;

     while ( run ) {
         if (maxTokenReached(actualToken)) {
             LOGGER.info("Max Token Reached!");
             break;
         }
         SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
         stmt.setConsistencyLevel(ConsistencyLevel.ALL);
         ResultSet resultSet = session.execute(stmt);

         Iterator<Row> rows = resultSet.iterator();
         if ( !rows.hasNext()){
             break;
         }

         List<String> rowIds = new ArrayList<String>();

         while (rows.hasNext()) {
             Row row = rows.next();

             Long leadTime = row.getLong("my_column");
             if (myCondition(myCollumn)) {
                 String rowId = row.getString("key");
                 rowIds.add(rowId);
             }

             if (!rows.hasNext()) {
                 Long token = row.getLong("token(rowid)");
                 if (!rowIds.isEmpty()) {
                     LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                 }
             actualToken = nextToken(token);
             }
         }
      }
     LOGGER.info("Done!");
     cluster.shutdown(); 
  }

public boolean maxTokenReached(Long actualToken){
     return actualToken >= maxToken; 
 }

 public String queryBuilder(Long nextRange) {
     return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
 }

 public Long nextToken(Long token) {
     return token + 1; 
 }
于 2015-04-28T03:49:29.007 回答
2

我强烈推荐使用 Spark——即使是在一个独立的应用程序中(即没有集群)。它将负责对分区进行分块并一一处理。也很容易使用:

https://github.com/datastax/spark-cassandra-connector

于 2015-04-28T15:10:05.593 回答
1

这是您需要做的常见事情吗?还是一种情况?我同意这不是你想要定期做的可取的事情,但我也遇到了一个问题,我必须阅读 ColumnFamily 中的所有行,我依赖于Astyanax客户端的AllRowsReader 配方。我看到您正在使用 Datastax CQL 驱动程序连接到您的集群,但是如果您正在寻找的东西被证明是有效的,那么您可能不关心使用 Astyanax 库处理问题。

在我的情况下,我曾经读取所有行键,然后我有另一项工作来使用我收集的键与 ColumnFamily 交互。

import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.recipes.reader.AllRowsReader;

import java.util.concurrent.CopyOnWriteArrayList;

...        

private final Keyspace keyspace;
private final ColumnFamily<String, byte[]> columnFamily;

public List<String> getAllKeys() throws Exception {

    final List<String> rowKeys = new CopyOnWriteArrayList<>();

    new AllRowsReader.Builder<>(keyspace, columnFamily).withColumnRange(null, null, false, 0)
        .withPartitioner(null) // this will use keyspace's partitioner
        .withConsistencyLevel(ConsistencyLevel.CL_ONE).forEachRow(row -> {
        if (row == null) {
            return true;
        }

        String key = row.getKey();

        rowKeys.add(key);

        return true;
    }).build().call();

    return rowKeys;
}

有不同的配置选项可以在多个线程和许多其他事情中运行它,就像我说我只在我的代码中运行过一次并且工作得非常好,如果你在尝试让它工作时遇到问题,我很乐意提供帮助.

希望这可以帮助,

何塞·路易斯

于 2015-04-30T06:28:29.487 回答
1

这是一个非常古老的问题,但我正在回答它,因为我遇到了同样的问题,即没有获取所有行并找到了原因。

当一个分区键有多行时会出现此问题。

在上述实现中,当由于 LIMIT 限制而返回分区中间的一行时,该分区中的其余行将丢失。

这是因为在下一个查询中,where 语句将从下一个分区的值开始读取。

例如,假设我们有一个如下表

partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              1|     1|                     1
              1|     2|                     1
              1|     3|                     1
              2|     4|                     2
              2|     5|                     2
              2|     6|                     2
              3|     7|                     3
              4|     8|                     4

如果我们在这个表上运行上面带有 LIMIT 2 的示例代码,我们会得到......

第一次迭代

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 0 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              1|     1|                     1
              1|     2|                     1

第二次迭代

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 1 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              2|     4|                     2
              2|     5|                     2

第三次迭代

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 2 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              3|     7|                     3
              4|     8|                     4

结果,我们无法获得 idx 3 和 6。

这是一个常见的分页查询实现错误。

于 2021-08-13T10:02:46.443 回答
1

如果您经常需要对 Cassandra 表进行全表扫描,例如在 Spark 中进行分析,那么我强烈建议您考虑使用读取优化的数据模型来存储数据。您可以查看 http://github.com/tuplejump/FiloDB,了解 Cassandra 上的读取优化设置示例。

于 2016-05-16T14:51:57.077 回答