感谢所有伟大的建议和想法,非常感谢!最终,这个讨论让我想出了一个不使用对象进行锁定的解决方案。
只是对我们实际在做什么的简要描述。
我们有一个缓存,可以从我们的环境中连续接收数据。缓存对于每个键都有几个“桶”,并在事件进入时将事件聚合到桶中。进入的事件有一个键,用于确定要使用的缓存条目,以及一个时间戳,用于确定缓存条目中应该使用的桶递增。
缓存还有一个定期运行的内部刷新任务。它将迭代所有缓存条目并将除当前存储桶之外的所有存储桶刷新到数据库。
现在传入数据的时间戳可以是过去的任何时间,但其中大部分是最近的时间戳。因此,当前存储桶将获得比先前时间间隔的存储桶更多的命中。
知道了这一点,我可以展示我们的竞争条件。所有这些代码都是针对单个缓存条目的,因为该问题与单个缓存元素的并发写入和刷新无关。
// buckets :: ConcurrentMap<Long, AtomicLong>
void incrementBucket(long timestamp, long value) {
long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);
AtomicLong bucket = buckets.get(key);
if (null == bucket) {
AtomicLong newBucket = new AtomicLong(0);
bucket = buckets.putIfAbsent(key, newBucket);
if (null == bucket) {
bucket = newBucket;
}
}
bucket.addAndGet(value);
}
Map<Long, Long> flush() {
long now = System.currentTimeMillis();
long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);
Map<Long, Long> flushedValues = new HashMap<Long, Long>();
for (Long key : new TreeSet<Long>(buckets.keySet())) {
if (key != nowKey) {
AtomicLong bucket = buckets.remove(key);
if (null != bucket) {
long databaseKey = databaseKey(key);
long n = bucket.get()
if (!flushedValues.containsKey(databaseKey)) {
flushedValues.put(databaseKey, n);
} else {
long sum = flushedValues.get(databaseKey) + n;
flushedValues.put(databaseKey, sum);
}
}
}
}
return flushedValues;
}
可能发生的情况是:(fl = 刷新线程,它 = 增量线程)
- 它:进入 incrementBucket,执行到调用 addAndGet(value) 之前
- fl:进入flush并迭代桶
- fl:到达正在递增的桶
- fl: 删除它并调用 bucket.get() 并将值存储到刷新的值
- 它:增加存储桶(现在将丢失,因为存储桶已被刷新并移除)
解决方案:
void incrementBucket(long timestamp, long value) {
long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);
boolean done = false;
while (!done) {
AtomicLong bucket = buckets.get(key);
if (null == bucket) {
AtomicLong newBucket = new AtomicLong(0);
bucket = buckets.putIfAbsent(key, newBucket);
if (null == bucket) {
bucket = newBucket;
}
}
synchronized (bucket) {
// double check if the bucket still is the same
if (buckets.get(key) != bucket) {
continue;
}
done = true;
bucket.addAndGet(value);
}
}
}
Map<Long, Long> flush() {
long now = System.currentTimeMillis();
long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);
Map<Long, Long> flushedValues = new HashMap<Long, Long>();
for (Long key : new TreeSet<Long>(buckets.keySet())) {
if (key != nowKey) {
AtomicLong bucket = buckets.get(key);
if (null != value) {
synchronized(bucket) {
buckets.remove(key);
long databaseKey = databaseKey(key);
long n = bucket.get()
if (!flushedValues.containsKey(databaseKey)) {
flushedValues.put(databaseKey, n);
} else {
long sum = flushedValues.get(databaseKey) + n;
flushedValues.put(databaseKey, sum);
}
}
}
}
}
return flushedValues;
}
我希望这对可能遇到相同问题的其他人有用。