1

I am getting the following error:

22:24:34.419 [run-main-0] DEBUG com.websudos.phantom - Executing query: com.datastax.driver.core.BatchStatement@3f4f5b68
22:24:34.426 [pool-15-thread-3] ERROR com.websudos.phantom - Batch too large
[error] (run-main-0) com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large

Have re-run the code and getting this error at the following point each time:

cqlsh> select count(*) from superchain.blocks  limit 1000000;

 count
-------
 51728

(1 rows)

Warnings :
Aggregation query used without partition key

Thanks in advance for any insights.

+++ UPDATES +++

So the offending code is

//This file is Database.scala
class Database(val keyspace: KeySpaceDef) extends DatabaseImpl(keyspace) {
  def insertBlock(block: Block) = {
  //should note here that have also tried Batch.unlogged to same effect
    Batch.logged
      .add(ChainDatabase.block.insertNewRecord(block))
      .future()
  }

  def insertTransaction(tx: Transaction) = {
  //should note here that have also tried Batch.unlogged to same effect
    Batch.logged
      .add(ChainDatabase.tx.insertNewTransaction(tx))
      .future()
  }

  object block extends BlockTable with keyspace.Connector

  object tx extends TransactionTable with keyspace.Connector


}

object ChainDatabase extends Database(Config.keySpaceDefinition)

The following shows the inserting functions for Transaction and there is similar code for the Block.

Have tried to follow

https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e#.7zdd0qopv

&&

https://github.com/outworkers/phantom/wiki/Batch-statements

But am still struggling to find an implementation that does not lead to the Batch too large errors.

//This file is Transaction.scala
abstract class TransactionTable extends TransactionColumnFamily with RootConnector {

  override val tableName = "transactions"

  def insertNew(tx: Transaction): Future[ResultSet] = insertNewTransaction(tx).future()

  def insertNewTransaction(tx: Transaction) = {
    insert
      .value(_.txid, tx.txid)
      .value(_.version, tx.version)
      .value(_.locktime, tx.locktime)
      .value(_.vout, tx.vout)
      .value(_.vin, tx.vin)
  }

}
4

3 回答 3

2

也许你误解了 Cassandra 中批处理的目的。

实际上,它们是为了原子性而不是为了“更快”运行多个查询。

可以在这里找到一个很好的解释:

https://lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/

于 2016-04-06T09:33:11.653 回答
2

您得到的错误不是因为表的大小,而是因为批处理中的查询数量。在任何给定的批次中,您最多可以同时运行 100 个查询。

同时,您几乎 99% 都在使用不太理想的方法,因为您从来没有真正想要在一个批次中进行如此多的查询。正如 Thiago 所建议的,批处理旨在保证原子性,而不是优化性能。

如果您只想简单地进行并行查询,只需使用Future.sequence,它将使用 fork join pool 类型的方法来并行化操作。

错误来自 Cassandra,而不是来自幻影。无论您在客户端使用哪种方法,批量大小都是有上限的。

// Assuming you have a list of queries:
val execution = Future.sequence(queries map (_.future())

希望这可以帮助!

更新

假设您有一个交易清单。

val list: List[Transaction] = ..
// all you need is
Future.sequence(list.map(tr => database.transactionTable.insertNew(tr))

这将产生一个在所有基础期货完成时完成的期货,有效地为您提供返回类型:Future[List[ResultSet]]从您的原始List[Future[ResultSet]].

于 2016-04-06T11:50:33.177 回答
1

正如其他人所说,您的第一条错误消息来自非常大的 BATCH 语句。BATCH 语句不是为批量插入而设计的,就像您在传统关系数据库中所想的那样。BATCH 语句仅在自动跨多个非规范化表插入数据或使用 UNLOGGED BATCH 在同一分区键下插入数据时有用。

批处理语句不应该用作优化技术,因为它们不是为速度而设计的,实际上会损害您的性能。

最后,这是一条错误消息,因为 Cassandra 客户端驱动程序试图保护集群免受一个非常大的 BATCH 语句的影响,该语句可以(并且将)关闭集群中的节点。

其次,您指出运行 aSELECT count(*) FROM table;会给您警告:

Aggregation query used without partition key.

使用count(*)is 而不指定分区键是一种反模式。出于与上述类似的原因,它可能会对集群的稳定性产生负面影响。

最后,我怀疑在您的 Cassandra DSL 库中的某个地方(不熟悉 Phantom-DSL)它正在执行您不期望它的 BATCH,或者您可能有意识地使用 BATCH 而没有完全理解它的适当用法。我知道在 spring-data 中,当您插入项目列表(这是一个可怕的反模式)时,它们会使用 BATCH,这可能会导致类似的错误。

于 2016-04-06T21:00:13.260 回答