6

我有一项服务以我控制的速率从队列中消耗消息。我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群。我已经使用maxRequestsPerConnection和设置了我的 Cassandra 集群maxConnectionsPerHost。但是,在测试中,我发现当我到达maxConnectionsPerHostmaxRequestsPerConnection呼吁session.executeAsync不要阻止时。

我现在正在做的是使用 anew Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)并在每个异步请求之前增加它,并在完成时返回它executeAsync。这工作得很好,但它似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。

有没有人想出更好的解决方案来解决这个问题?

一个警告:我希望一个请求在完成之前被认为是未完成的。这包括重试!我从集群中获得可重试失败的情况(例如等待一致性的超时)是我想要背压并停止使用队列中的消息的主要情况。

问题:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

当前解决方案:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

另外,任何人都可以看到这个解决方案有任何明显的问题吗?

4

2 回答 2

5

一个不杀死集群的可能想法是“限制”你的调用,executeAsync例如在一批 100 之后(或任何数字最适合你的集群和工作负载),你将在客户端代码中休眠并执行阻塞调用所有 100 个期货(或使用 Guava 库将未来列表转换为列表的未来)

这样,在发出 100 个异步查询后,您将强制客户端应用程序等待所有查询都成功,然后再继续。如果在调用 时捕获任何异常future.get(),您可以安排重试。通常,Java 驱动程序的默认 RetryStrategy 已经尝试过重试。

关于来自服务器的背压信号,从 CQL 二进制协议 V3 开始,有一个错误代码通知客户端协调器过载https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3。规格#L951

从客户端,您可以通过 2 种方式获取此重载信息:

于 2016-02-10T22:38:21.060 回答
2

我现在正在做的是使用一个新的 Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) 并在每个异步请求之前递增它,并在 executeAsync 返回的未来完成时递减它。这工作得很好,但它似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。

这是一种非常合理的方法,它允许在其他请求完成时填写新请求。您可以将释放许可证与未来的完成联系起来。

驱动程序本身不这样做的原因是它试图尽可能少地阻塞,而是快速失败。不幸的是,这将一些责任推给了客户。

在通常情况下,一次同时向主机发送这么多请求是不好的。C* 有一个native_transport_max_threads设置(默认为 128),用于控制一次处理请求的线程数。最好将自己限制在每台主机的 2 * 那个数字上。(有关详细信息,请参阅:Cassandra 如何处理 datastax java 驱动程序中的阻塞执行语句

我希望在完成之前将其视为未完成的请求。这包括重试!我从集群中获得可重试失败的情况(例如等待一致性的超时)是我想要背压并停止使用队列中的消息的主要情况。

在成功完成、用尽重试或由于某种原因失败之前,驱动程序不会完成未来。因此,您可以绑定信号量许可的释放,直到未来完成或失败。

于 2016-02-10T22:45:02.463 回答