我正在使用 datastax java driver 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在使用 QUORUM 一致性异步编写。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
我上面的保存方法将以非常快的速度从多个线程中调用。
问题:
我想将请求限制为executeAsync
异步写入 Cassandra 的方法的请求。如果我的写入速度超出我的 Cassandra 集群可以处理的速度,那么它将开始抛出错误,我希望我的所有写入都应该成功进入 cassandra,而不会造成任何损失。
我看到了这篇文章,其中解决方案是使用Semaphore
固定数量的许可证。但我不确定如何以及实现它的最佳方法是什么。我以前从未使用过 Semaphor。这就是逻辑。任何人都可以根据我的代码提供一个基于信号量的示例,或者如果有更好的方法/选项,那么也请告诉我。
在编写数据加载程序的上下文中,您可以执行以下操作:
- 为了简单起见,请使用 Semaphore 或其他具有固定数量许可的构造(这将是您的最大飞行请求数)。每当您使用 executeAsync 提交查询时,都需要获得许可。您实际上应该只需要 1 个线程(但可能需要引入一个 # cpu cores size 的池来执行此操作)从 Semaphore 获取许可并执行查询。它只会阻止获取,直到有可用的许可证。
- 将 Futures.addCallback 用于从 executeAsync 返回的未来。回调应在 onSuccess 和 onFailure 情况下调用 Sempahore.release()。通过释放许可,这应该允许您在步骤 1 中的线程继续并提交下一个请求。
此外,我还看到了其他几篇他们谈到使用的帖子RingBuffer
,或者Guava RateLimitter
哪一个更好,我应该使用?以下是我能想到的选项:
- 使用信号量
- 使用环形缓冲区
- 使用 Guava 速率限制器
谁能帮我举一个例子,说明我们如何限制请求或获得 cassandra 写入的背压并确保所有写入成功进入 cassandra?