2

我们正在编写一些锁定代码并且遇到了一个特殊的问题。我们使用 ConcurrentHashMap 来获取我们锁定的 Object 实例。所以我们的同步块看起来像这样

synchronized(locks.get(key)) { ... }

我们重写了 ConcurrentHashMap 的 get 方法,使其在不包含 key 的情况下总是返回一个新对象。

@Override
public Object get(Object key) {
   Object o = super.get(key);
   if (null == o) {
      Object no = new Object();
      o = putIfAbsent((K) key, no);
      if (null == o) {
         o = no;
      }
   }
   return o;
}

但是是否存在get方法已经返回对象,但是线程还没有进入同步块的状态。允许其他线程获取相同的对象并锁定它。

我们有一个潜在的竞争条件是

  • 线程1:获取key为A的对象,但不进入同步块
  • 线程2:获取key为A的对象,进入一个同步块
  • 线程 2:从地图中移除对象,退出同步块
  • 线程 1:与不再在映射中的对象进入同步块
  • 线程 3:获取密钥 A 的新对象(与线程 1 获取的对象不同)
  • 线程 3:进入一个同步块,而线程 1 也在其同步块中,两者都使用键 A

如果 java 在 get 调用返回后直接进入同步块,则不会出现这种情况。如果没有,是否有人对我们如何删除密钥而不必担心这种竞争条件有任何意见?

4

5 回答 5

4

正如我所看到的,问题源于您锁定地图值的事实,而实际上您需要锁定密钥(或它的某些派生)。如果我理解正确,您希望避免 2 个线程使用相同的键运行关键部分。

您可以锁定钥匙吗?你能保证你总是使用相同的密钥实例吗?

一个不错的选择:

根本不要删除锁。使用具有弱值的ReferenceMap 。这样,只有当它当前没有被任何线程使用时,映射条目才会被删除。

笔记:

1) 现在你必须同步这个地图(使用 Collections.synchronizedMap(..))。

2)您还需要同步为给定键生成/返回值的代码。

于 2010-11-09T15:16:33.213 回答
1

原样的代码是线程安全的。话虽如此,如果您从 CHM 中删除,那么在从集合返回的对象上进行同步时所做的任何类型的假设都将丢失。

但是是否存在get方法已经返回对象,但是线程还没有进入同步块的状态。允许其他线程获取相同的对象并锁定它。

是的,但是只要您在对象上进行同步,就会发生这种情况。需要保证的是,在另一个线程存在之前,另一个线程不会进入同步块。

如果没有,是否有人对我们如何删除密钥而不必担心这种竞争条件有任何意见?

确保这种原子性的唯一真正方法是在 CHM 或另一个对象(由所有线程共享)上同步。最好的方法是不要从 CHM 中删除。

于 2010-11-09T14:58:38.563 回答
1

你有两个选择:

一种。您可以在同步块内检查一次地图。

Object o = map.get(k);
synchronized(o) {
  if(map.get(k) != o) {
    // object removed, handle...
  }
}

湾。您可以扩展您的值以包含指示其状态的标志。当从映射中删除一个值时,您设置一个标志指示它已被删除(在同步块内)。

CacheValue v = map.get(k);
sychronized(v) {
  if(v.isRemoved()) {
    // object removed, handle...
  }
}
于 2010-11-09T16:40:27.977 回答
1

感谢所有伟大的建议和想法,非常感谢!最终,这个讨论让我想出了一个不使用对象进行锁定的解决方案。

只是对我们实际在做什么的简要描述。

我们有一个缓存,可以从我们的环境中连续接收数据。缓存对于每个键都有几个“桶”,并在事件进入时将事件聚合到桶中。进入的事件有一个键,用于确定要使用的缓存条目,以及一个时间戳,用于确定缓存条目中应该使用的桶递增。

缓存还有一个定期运行的内部刷新任务。它将迭代所有缓存条目并将除当前存储桶之外的所有存储桶刷新到数据库。

现在传入数据的时间戳可以是过去的任何时间,但其中大部分是最近的时间戳。因此,当前存储桶将获得比先前时间间隔的存储桶更多的命中。

知道了这一点,我可以展示我们的竞争条件。所有这些代码都是针对单个缓存条目的,因为该问题与单个缓存元素的并发写入和刷新无关。

// buckets :: ConcurrentMap<Long, AtomicLong>

void incrementBucket(long timestamp, long value) {
   long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);
   AtomicLong bucket = buckets.get(key);
   if (null == bucket) {
      AtomicLong newBucket = new AtomicLong(0);
      bucket = buckets.putIfAbsent(key, newBucket);
      if (null == bucket) {
          bucket = newBucket;
      }
   }

   bucket.addAndGet(value);
}

Map<Long, Long> flush() {
   long now = System.currentTimeMillis();
   long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);

   Map<Long, Long> flushedValues = new HashMap<Long, Long>();
   for (Long key : new TreeSet<Long>(buckets.keySet())) {
      if (key != nowKey) {
         AtomicLong bucket = buckets.remove(key);
         if (null != bucket) {
            long databaseKey = databaseKey(key);
            long n = bucket.get()
            if (!flushedValues.containsKey(databaseKey)) {
               flushedValues.put(databaseKey, n);
            } else {
               long sum = flushedValues.get(databaseKey) + n;
               flushedValues.put(databaseKey, sum);
            }
         }
      }
   }

   return flushedValues;
}

可能发生的情况是:(fl = 刷新线程,它 = 增量线程)

  • 它:进入 incrementBucket,执行到调用 addAndGet(value) 之前
  • fl:进入flush并迭代桶
  • fl:到达正在递增的桶
  • fl: 删除它并调用 bucket.get() 并将值存储到刷新的值
  • 它:增加存储桶(现在将丢失,因为存储桶已被刷新并移除)

解决方案:

void incrementBucket(long timestamp, long value) {
   long key = bucketKey(timestamp, LOG_BUCKET_INTERVAL);

   boolean done = false;
   while (!done) {
      AtomicLong bucket = buckets.get(key);
      if (null == bucket) {
         AtomicLong newBucket = new AtomicLong(0);
         bucket = buckets.putIfAbsent(key, newBucket);
         if (null == bucket) {
             bucket = newBucket;
         }
      }

      synchronized (bucket) {
         // double check if the bucket still is the same
         if (buckets.get(key) != bucket) {
            continue;
         }
         done = true;

         bucket.addAndGet(value);
      }
   }
}

Map<Long, Long> flush() {
   long now = System.currentTimeMillis();
   long nowKey = bucketKey(now, LOG_BUCKET_INTERVAL);

   Map<Long, Long> flushedValues = new HashMap<Long, Long>();
   for (Long key : new TreeSet<Long>(buckets.keySet())) {
      if (key != nowKey) {
         AtomicLong bucket = buckets.get(key);
         if (null != value) {
            synchronized(bucket) {
               buckets.remove(key);
               long databaseKey = databaseKey(key);
               long n = bucket.get()
               if (!flushedValues.containsKey(databaseKey)) {
                  flushedValues.put(databaseKey, n);
               } else {
                  long sum = flushedValues.get(databaseKey) + n;
                  flushedValues.put(databaseKey, sum);
               }
            }
         }
      }
   }

   return flushedValues;
}

我希望这对可能遇到相同问题的其他人有用。

于 2010-11-10T12:36:16.100 回答
0

您提供的两个代码片段很好,因为它们是。您所做的类似于使用 Guava 的MapMaker.makeComputingMap()进行延迟实例化的工作方式,但我认为延迟创建键的方式没有问题。

顺便说一句,线程完全有可能在查找锁定对象之后但进入同步之前被抢占。get()

我的问题是您的比赛条件描述中的第三个要点。你说:

线程 2:从地图中移除对象,退出同步块

哪个对象,哪个地图?一般来说,我假设您正在查找要锁定的密钥,然后将在同步块内对其他数据结构执行一些其他操作。如果您正在谈论从开头提到的 ConcurrentHashMap 中删除锁定对象,那将是一个巨大的差异。

真正的问题是这是否有必要。在通用环境中,我认为只记住曾经查找过的所有键的所有锁定对象不会有任何内存问题(即使这些键不再代表活动对象)。想出某种方法来安全地处理可能随时存储在其他线程的局部变量中的对象要困难得多,如果你确实想走这条路,我感觉性能会下降到围绕键查找的单个粗锁。

如果我误解了那里发生的事情,请随时纠正我。

编辑:好的——在这种情况下,我坚持我的上述说法,即最简单的方法不是移除钥匙;这实际上可能不像您想象的那样有问题,因为空间增长的速度将非常小。根据我的计算(这很可能是错误的,我不是空间计算方面的专家,并且您的 JVM 可能会有所不同)地图以大约 14Kb/小时的速度增长。在这张地图用完 100MB 的堆空间之前,您必须有一年的连续正常运行时间。

但是让我们假设确实需要删除密钥。这带来了一个问题,即您无法删除密钥,直到您知道没有线程正在使用它。这导致了先有鸡还是先有蛋的问题,您将要求所有线程在其他东西上同步,以便获得原子性(检查)和跨线程的可见性,这意味着您除了拍围绕整个事物的单个同步块,完全颠覆您的锁条带策略。

让我们重新审视一下约束。这里的主要事情是事情最终会得到澄清。这不是正确性限制,而只是内存问题。因此,我们真正想做的是确定某个点肯定不再使用该键,然后使用它作为触发器将其从地图中删除。这里有两种情况:

  1. 您可以识别这种情况,并对其进行逻辑测试。在这种情况下,您可以使用(在最坏的情况下)某种计时器线程,或者希望一些与您的应用程序更干净集成的逻辑来从地图中删除键。
  2. 您无法确定您知道不再使用密钥的任何条件。在这种情况下,根据定义,从地图中删除键是不安全的。所以事实上,为了正确起见,你必须把它们留在里面。

无论如何,这实际上归结为手动垃圾收集。当您可以懒惰地确定不再使用它们时,从地图中删除键。您当前的解决方案在这里过于急切,因为(正如您所指出的)它在这种情况发生之前进行了删除。

于 2010-11-09T15:54:31.840 回答