我有一项服务以我控制的速率从队列中消耗消息。我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群。我已经使用maxRequestsPerConnection
和设置了我的 Cassandra 集群maxConnectionsPerHost
。但是,在测试中,我发现当我到达maxConnectionsPerHost
并maxRequestsPerConnection
呼吁session.executeAsync
不要阻止时。
我现在正在做的是使用 anew Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)
并在每个异步请求之前增加它,并在完成时返回它executeAsync
。这工作得很好,但它似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。
有没有人想出更好的解决方案来解决这个问题?
一个警告:我希望一个请求在完成之前被认为是未完成的。这包括重试!我从集群中获得可重试失败的情况(例如等待一致性的超时)是我想要背压并停止使用队列中的消息的主要情况。
问题:
// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
// this appears to return immediately even if I have exhausted connections/requests
session.executeAsync(preparedStatement.bind(...));
}
当前解决方案:
constructor() {
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}
processMessage(message) {
ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result, exception) -> concurrentRequests.release());
}
另外,任何人都可以看到这个解决方案有任何明显的问题吗?