17

我正在使用 datastax java driver 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在使用 QUORUM 一致性异步编写。

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

我上面的保存方法将以非常快的速度从多个线程中调用。

问题:

我想将请求限制为executeAsync异步写入 Cassandra 的方法的请求。如果我的写入速度超出我的 Cassandra 集群可以处理的速度,那么它将开始抛出错误,我希望我的所有写入都应该成功进入 cassandra,而不会造成任何损失。

我看到了这篇文章,其中解决方案是使用Semaphore固定数量的许可证。但我不确定如何以及实现它的最佳方法是什么。我以前从未使用过 Semaphor。这就是逻辑。任何人都可以根据我的代码提供一个基于信号量的示例,或者如果有更好的方法/选项,那么也请告诉我。

在编写数据加载程序的上下文中,您可以执行以下操作:

  • 为了简单起见,请使用 Semaphore 或其他具有固定数量许可的构造(这将是您的最大飞行请求数)。每当您使用 executeAsync 提交查询时,都需要获得许可。您实际上应该只需要 1 个线程(但可能需要引入一个 # cpu cores size 的池来执行此操作)从 Semaphore 获取许可并执行查询。它只会阻止获取,直到有可用的许可证。
  • 将 Futures.addCallback 用于从 executeAsync 返回的未来。回调应在 onSuccess 和 onFailure 情况下调用 Sempahore.release()。通过释放许可,这应该允许您在步骤 1 中的线程继续并提交下一个请求。

此外,我还看到了其他几篇他们谈到使用的帖子RingBuffer,或者Guava RateLimitter哪一个更好,我应该使用?以下是我能想到的选项:

  • 使用信号量
  • 使用环形缓冲区
  • 使用 Guava 速率限制器

谁能帮我举一个例子,说明我们如何限制请求或获得 cassandra 写入的背压并确保所有写入成功进入 cassandra?

4

2 回答 2

9

不是权威答案,但也许会有所帮助。首先你应该考虑当查询不能马上执行时你会怎么做。无论您选择哪种速率限制,如果您收到的请求速率高于您可以写入 Cassandra 的速率,最终您的进程都会被等待请求阻塞。在那一刻,您需要告诉您的客户将他们的请求保留一段时间(“推回”)。例如,如果它们是通过 HTTP 来的,那么响应状态将是 429“Too Many Requests”。如果您在同一进程中生成请求,则确定可接受的最长超时时间。也就是说,如果 Cassandra 跟不上,那么是时候扩展(或调整)它了。

也许在实施速率限制之前,值得在调用save方法(使用 Thread.sleep(...))之前在线程中进行试验并添加人为延迟,看看它是否解决了您的问题或需要其他东西。

查询返回错误来自 Cassandra 的背压。但是您可以选择或实施RetryPolicy以确定何时重试失败的查询。

您还可以查看连接池选项(尤其是监控和调整池)。可以调整每个连接的异步请求数。但是文档说,对于 Cassandra 2.x,此参数上限为 128,并且不应更改它(不过我会尝试使用它:)

使用信号量的实现看起来像

/* Share it among all threads or associate with a thread for per-thread limits
   Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) {
  ....
  queryPermits.acquire(); // Blocks until a permit is available

  ResultSetFuture future = session.executeAsync(bs);
  Futures.addCallback(future, new FutureCallback<ResultSet>() {
    @Override
    public void onSuccess(ResultSet result) {
      queryPermits.release();
      logger.logInfo("successfully written");
    }
    @Override
    public void onFailure(Throwable t) {
      queryPermits.release(); // Permit should be released in all cases.
      logger.logError("error= ", t);
    }
  }, executorService);
  ....
}

(在实际代码中,我会创建一个包装回调,它会释放许可,然后调用包装的方法)

Guava 的 RateLimiter 类似于 semaphore,但允许在未充分利用期后临时爆发,并根据时间限制请求(而不是活动查询的总数)。

但是无论如何,请求都会因各种原因而失败,因此最好制定一个如何重试它们的计划(以防出现间歇性错误)。

在您的情况下可能不合适,但我会尝试使用一些队列或缓冲区来将请求排入队列(例如java.util.concurrent.ArrayBlockingQueue)。“缓冲区已满”意味着客户端应该等待或放弃请求。缓冲区也将用于重新排队失败的请求。然而,为了更公平,失败的请求可能应该放在队列的前面,以便首先重试。当队列已满并且同时有新的失败请求时,还应该以某种方式处理这种情况。然后,一个单线程工作人员会从队列中挑选请求并将它们发送到 Cassandra。由于它不应该做太多,它不太可能成为瓶颈。这个工人也可以应用它自己的速率限制,例如基于时间com.google.common.util.concurrent.RateLimiter

如果想尽可能地避免丢失消息,他可以在 Cassandra 前面放置一个具有持久性的消息代理(例如 Kafka)。这样,即使 Cassandra 长时间中断,传入的消息也可以存活。但是,我想,在你的情况下,这是矫枉过正的。

于 2017-01-02T12:56:50.800 回答
2

只需使用阻塞队列就可以了。期货是线程化的,回调(成功和失败)将充当消费者,无论您从哪里调用 save 方法,都将充当生产者。

更好的方法是,您将完整的请求本身放入队列中,并在每次出队时将其逐个触发保存。

private final ExecutorService executorService = Executors.newFixedThreadPool(10);

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
          queue.take();
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
          queue.take();
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
}

public void invokeSaveInLoop(){
    Object dummyObj = new Object();
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);;
    for(int i=0; i< 1000; i++){
        save("process", clientid, deviceid, queue);
        queue.put(dummyObj);
    }
}

如果您想进一步检查集群中途的负载

public static String getCurrentState(){    
StringBuilder response = new StringBuilder();
            response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n");
            final LoadBalancingPolicy loadBalancingPolicy =
                    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
            final PoolingOptions poolingOptions =
                    cluster.getConfiguration().getPoolingOptions();
            Session.State state = session.getState();
            for (Host host : state.getConnectedHosts()) {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n",
                                host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries,
                                connections *
                                        poolingOptions.getMaxRequestsPerConnection(distance)))
                        .append("<br>\n");
            }
            return response.toString();
}
于 2017-01-08T18:52:20.317 回答