2

当尝试一次将数千条记录插入远程 Cassandra 数据库时,我重复地遇到超时(在慢速连接上有 5 到 6 千个元素)

错误:

All host(s) tried for query failed (tried: /...:9042
(com.datastax.driver.core.exceptions.OperationTimedOutException: [/...]
Timed out waiting for server response))
com.datastax.driver.core.exceptions.NoHostAvailableException: 
All host(s) tried for query failed (tried: /...:9042
(com.datastax.driver.core.exceptions.OperationTimedOutException: [/...]
Timed out waiting for server response))

该模型:

class RecordModel extends CassandraTable[ConcreteRecordModel, Record] {

  object id extends StringColumn(this) with PartitionKey[String]

...
abstract class ConcreteRecordModel extends RecordModel 
    with RootConnector with ResultSetFutureHelper {

def store(rec: Record): Future[ResultSet] = 
    insert.value(_.id, rec.id).value(...).future()

def store(recs: List[Record]): Future[List[ResultSet]] = Future.traverse(recs)(store)

连接器:

val connector = ContactPoints(hosts).withClusterBuilder(
  _.withCredentials(
    config.getString("username"),
    config.getString("password")
  ).withPoolingOptions(
    new PoolingOptions().setCoreConnectionsPerHost(HostDistance.LOCAL, 4)
      .setMaxConnectionsPerHost(HostDistance.LOCAL, 10)
      .setCoreConnectionsPerHost(HostDistance.REMOTE, 2)
      .setMaxConnectionsPerHost(HostDistance.REMOTE, 4)
      .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
      .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000)
      .setPoolTimeoutMillis(10000)
  )
).keySpace(keyspace)

我尝试过单独和一起调整池选项。但即使将所有设置加倍REMOTE也没有明显改变超时

当前的解决方法,我想避免 - 将列表分成批次并等待每个批次完成:

def store(recs: List[Record]): Future[List[ResultSet]] = {
  val rs: Iterator[List[ResultSet]] = recs.grouped(1000) map { slice =>
    Await.result(Future.traverse(slice)(store), 100 seconds)
  }
  Future.successful(rs.to[List].flatten)
}

处理这个问题的好方法是什么?

谢谢

编辑

这些错误确实表明集群出现故障/过载,但我怀疑网络在这里起主要作用。上面提供的数字来自远程机器。当同一个数据中心的机器提供相同的 C* 时,它们要高得多。另一个可疑的细节是,使用quill提供相同的 C* 实例不会遇到任何超时问题,无论是否远程。

我真正不喜欢节流的是批量大小是随机的和静态的,而它们应该是可适应的。

4

1 回答 1

1

听起来您正在达到集群的极限。如果您想避免超时,您将需要添加更多容量来处理负载。如果你只想进行突发写入,你应该限制它们(就像你正在做的那样),因为向太少的节点发送太多的查询会抑制性能。如果您想等到可以写入,您还可以增加服务器端的超时时间(read_request_timeout_in_ms、write_request_timeout_in_ms、request_timeout_in_ms),但这是不可取的,因为您不会给 Cassandra 任何时间来恢复并可能导致大量 ParNew GC。

于 2016-06-10T03:17:11.973 回答