35

我正在寻找一种基于它接收到的参数来同步方法的方法,如下所示:

public synchronized void doSomething(name){
//some code
}

我希望方法doSomething基于这样的name参数同步:

线程 1:doSomething("a");

线程 2:doSomething("b");

线程 3:doSomething("c");

线程 4:doSomething("a");

线程 1 、线程 2 和线程 3 将在不同步的情况下执行代码,但线程 4 将等待线程 1 完成代码,因为它具有相同的“a”值。

谢谢

更新

根据 Tudor 的解释,我认为我面临另一个问题:这是新代码的示例:

private HashMap locks=new HashMap();
public void doSomething(String name){
    locks.put(name,new Object());
    synchronized(locks.get(name)) {
        // ...
    }
    locks.remove(name);
}

我不填充锁映射的原因是因为 name 可以有任何值。

根据上面的示例,由于 HashMap 不是线程安全的,因此多个线程同时在 hashmap 中添加/删除值时会出现问题。

所以我的问题是,如果我使HashMapaConcurrentHashMap是线程安全的,同步块会阻止其他线程访问 locks.get(name) 吗?

4

8 回答 8

24

使用映射将字符串与锁对象相关联:

Map<String, Object> locks = new HashMap<String, Object>();
locks.put("a", new Object());
locks.put("b", new Object());
// etc.

然后:

public void doSomething(String name){
    synchronized(locks.get(name)) {
        // ...
    }
}
于 2012-09-16T20:23:46.240 回答
21

TL;博士:

我使用Spring Framework 中的ConcurrentReferenceHashMap。请检查下面的代码。


虽然这条线很老了,但它仍然很有趣。因此,我想与 Spring Framework 分享我的方法。

我们试图实现的东西叫做mutex/lock。正如Tudor 的回答所建议的那样,这个想法是有一个Map来存储锁名称和锁对象。代码如下所示(我从他的回答中复制了它):

Map<String, Object> locks = new HashMap<String, Object>();
locks.put("a", new Object());
locks.put("b", new Object());

但是,这种方法有两个缺点:

  1. OP已经指出了第一个:如何同步对locks哈希映射的访问?
  2. 如何删除一些不再需要的锁?否则,locks哈希图将继续增长。

第一个问题可以通过使用ConcurrentHashMap来解决。对于第二个问题,我们有两个选择:手动检查并从映射中删除锁,或者以某种方式让垃圾收集器知道哪些锁不再使用并且 GC 将删除它们。我会选择第二种方式。

当我们使用HashMap, orConcurrentHashMap时,它会创建强引用。要实现上面讨论的解决方案,应该改用弱引用(要了解什么是强/弱引用,请参阅这篇文章这篇文章)。


因此,我使用Spring Framework 中的ConcurrentReferenceHashMap。如文档中所述:

ConcurrentHashMap对键和值都使用软引用或弱引用的A。

此类可以用作替代方案 Collections.synchronizedMap(new WeakHashMap<K, Reference<V>>()),以便在并发访问时支持更好的性能。ConcurrentHashMap除了支持空值和空键外,此实现遵循相同的设计约束 。

这是我的代码。MutexFactory管理所有锁的是<K>钥匙的类型。

@Component
public class MutexFactory<K> {

    private ConcurrentReferenceHashMap<K, Object> map;

    public MutexFactory() {
        this.map = new ConcurrentReferenceHashMap<>();
    }

    public Object getMutex(K key) {
        return this.map.compute(key, (k, v) -> v == null ? new Object() : v);
    }
}

用法:

@Autowired
private MutexFactory<String> mutexFactory;

public void doSomething(String name){
    synchronized(mutexFactory.getMutex(name)) {
        // ...
    }
}

单元测试(这个测试使用了一些方法的等待库,例如await(), atMost(), until()):

public class MutexFactoryTests {
    private final int THREAD_COUNT = 16;

    @Test
    public void singleKeyTest() {
        MutexFactory<String> mutexFactory = new MutexFactory<>();
        String id = UUID.randomUUID().toString();
        final int[] count = {0};

        IntStream.range(0, THREAD_COUNT)
                .parallel()
                .forEach(i -> {
                    synchronized (mutexFactory.getMutex(id)) {
                        count[0]++;
                    }
                });
        await().atMost(5, TimeUnit.SECONDS)
                .until(() -> count[0] == THREAD_COUNT);
        Assert.assertEquals(count[0], THREAD_COUNT);
    }
}
于 2019-03-28T19:15:55.000 回答
13

Tudor 的答案很好,但它是静态的,不可扩展。我的解决方案是动态的和可扩展的,但它的实现复杂性增加了。外界可以像使用 a 一样使用这个类Lock,因为这个类实现了接口。您可以通过工厂方法获得参数化锁的实例getCanonicalParameterLock

package lock;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public final class ParameterLock implements Lock {

    /** Holds a WeakKeyLockPair for each parameter. The mapping may be deleted upon garbage collection
     * if the canonical key is not strongly referenced anymore (by the threads using the Lock). */
    private static final Map<Object, WeakKeyLockPair> locks = new WeakHashMap<>();

    private final Object key;
    private final Lock lock;

    private ParameterLock (Object key, Lock lock) {
        this.key = key;
        this.lock = lock;
    }

    private static final class WeakKeyLockPair {
        /** The weakly-referenced parameter. If it were strongly referenced, the entries of
         * the lock Map would never be garbage collected, causing a memory leak. */
        private final Reference<Object> param;
        /** The actual lock object on which threads will synchronize. */
        private final Lock lock;

        private WeakKeyLockPair (Object param, Lock lock) {
            this.param = new WeakReference<>(param);
            this.lock = lock;
        }
    }

    public static Lock getCanonicalParameterLock (Object param) {
        Object canonical = null;
        Lock lock = null;

        synchronized (locks) {
            WeakKeyLockPair pair = locks.get(param);            
            if (pair != null) {                
                canonical = pair.param.get(); // could return null!
            }
            if (canonical == null) { // no such entry or the reference was cleared in the meantime                
                canonical = param; // the first thread (the current thread) delivers the new canonical key
                pair = new WeakKeyLockPair(canonical, new ReentrantLock());
                locks.put(canonical, pair);
            }
        }

        // the canonical key is strongly referenced now...
        lock = locks.get(canonical).lock; // ...so this is guaranteed not to return null
        // ... but the key must be kept strongly referenced after this method returns,
        // so wrap it in the Lock implementation, which a thread of course needs
        // to be able to synchronize. This enforces a thread to have a strong reference
        // to the key, while it isn't aware of it (as this method declares to return a 
        // Lock rather than a ParameterLock).
        return new ParameterLock(canonical, lock);               
    }

    @Override
    public void lock() {
        lock.lock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        lock.lockInterruptibly();
    }

    @Override
    public boolean tryLock() {
        return lock.tryLock();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return lock.tryLock(time, unit);
    }

    @Override
    public void unlock() {
        lock.unlock();
    }

    @Override
    public Condition newCondition() {
        return lock.newCondition();
    }
}

当然,您需要一个给定参数的规范键,否则线程将不会同步,因为它们将使用不同的锁。规范化相当于 Tudor 解决方案中字符串的内部化。哪里String.intern()本身是线程安全的,我的“规范池”不是,所以我需要在 WeakHashMap 上进行额外的同步。

此解决方案适用于任何类型的对象。但是,请确保在自定义类中正确实现equalshashCode正确实现,因为如果不这样做,将出现线程问题,因为多个线程可能使用不同的 Lock 对象进行同步!

WeakHashMap 的选择是由它带来的内存管理的便利性来解释的。不然怎么知道没有线程在使用特定的锁呢?如果这可以知道,你怎么能安全地从地图中删除条目?您需要在删除时进行同步,因为在想要使用锁的到达线程和从映射中删除锁的操作之间存在竞争条件。所有这些事情都是通过使用弱引用来解决的,所以 VM 会为您完成工作,这大大简化了实现。如果你检查了 WeakReference 的 API,你会发现依赖弱引用是线程安全的。

现在检查这个测试程序(由于某些字段的私有可见性,您需要从 ParameterLock 类内部运行它):

public static void main(String[] args) {
    Runnable run1 = new Runnable() {

        @Override
        public void run() {
            sync(new Integer(5));
            System.gc();
        }
    };
    Runnable run2 = new Runnable() {

        @Override
        public void run() {
            sync(new Integer(5));
            System.gc();
        }
    };
    Thread t1 = new Thread(run1);
    Thread t2 = new Thread(run2);

    t1.start();
    t2.start();

    try {
        t1.join();
        t2.join();
        while (locks.size() != 0) {
            System.gc();
            System.out.println(locks);
        }
        System.out.println("FINISHED!");
    } catch (InterruptedException ex) {
        // those threads won't be interrupted
    }
}

private static void sync (Object param) {
    Lock lock = ParameterLock.getCanonicalParameterLock(param);
    lock.lock();
    try {
        System.out.println("Thread="+Thread.currentThread().getName()+", lock=" + ((ParameterLock) lock).lock);
        // do some work while having the lock
    } finally {
        lock.unlock();
    }        
}

您很有可能会看到两个线程都在使用同一个锁对象,因此它们是同步的。示例输出:

Thread=Thread-0, lock=java.util.concurrent.locks.ReentrantLock@8965fb[Locked by thread Thread-0]
Thread=Thread-1, lock=java.util.concurrent.locks.ReentrantLock@8965fb[Locked by thread Thread-1]
FINISHED!

但是,有可能这两个线程在执行中不重叠,因此不需要它们使用相同的锁。通过在正确的位置设置断点,您可以轻松地在调试模式下强制执行此行为,强制第一个或第二个线程在必要时停止。您还会注意到,在主线程上的垃圾收集之后,WeakHashMap 将被清除,这当然是正确的,因为主线程通过调用等待两个工作线程完成它们的工作Thread.join()在调用垃圾收集器之前。这确实意味着在工作线程中不再存在对 (Parameter)Lock 的强引用,因此可以从弱 hashmap 中清除该引用。如果另一个线程现在想要在同一个参数上同步,一个新的 Lock 将在同步部分中创建getCanonicalParameterLock

现在用任何具有相同规范表示的对重复测试(= 它们相等,所以a.equals(b)),看看它仍然有效:

sync("a");
sync(new String("a"))

sync(new Boolean(true));
sync(new Boolean(true));

等等

基本上,此类为您提供以下功能:

  • 参数化同步
  • 封装内存管理
  • 使用任何类型的对象的能力(在正确实施的条件下equalshashCode
  • 实现锁接口

此 Lock 实现已通过同时修改 ArrayList 进行测试,其中 10 个线程迭代 1000 次,执行以下操作:添加 2 个项目,然后通过迭代完整列表删除最后找到的列表条目。每次迭代都会请求一个锁,因此总共将请求 10*1000 个锁。没有抛出 ConcurrentModificationException,并且在所有工作线程完成后,项目总数为 10*1000。在每一次修改中,调用都会请求一个锁ParameterLock.getCanonicalParameterLock(new String("a")),因此使用一个新的参数对象来测试规范化的正确性。

请注意,您不应该对参数使用字符串文字和原始类型。由于 String 字面量是自动实习的,它们总是有一个强引用,因此如果第一个线程到达时带有一个 String 字面量作为其参数,那么锁池将永远不会从条目中释放出来,这就是内存泄漏。自动装箱原语也是如此:例如,Integer 有一个缓存机制,该机制将在自动装箱过程中重用现有的 Integer 对象,这也会导致强引用的存在。然而,解决这个问题,这是一个不同的故事。

于 2015-02-05T15:29:48.917 回答
1

看看这个框架。看来你正在寻找这样的东西。

public class WeatherServiceProxy {
...
private final KeyLockManager lockManager = KeyLockManagers.newManager();

public void updateWeatherData(String cityName, Date samplingTime, float temperature) {
        lockManager.executeLocked(cityName, new LockCallback() {
                public void doInLock() {
                        delegate.updateWeatherData(cityName, samplingTime, temperature);
                }
        });
}

https://code.google.com/p/jkeylockmanager/

于 2014-07-16T12:39:26.097 回答
1

我基于 McDowell 的IdMutexProvider创建了一个 tokenProvider 。管理器使用一个WeakHashMap负责清理未使用的锁。

你可以在这里找到我的实现。

于 2015-02-26T10:23:12.207 回答
1

我通过另一个stackoverflow问题找到了正确的答案:如何通过钥匙获取锁

我在这里复制了答案:

Guava 在 13.0 中发布了类似的东西;如果你愿意,你可以把它从 HEAD 中取出。

Striped 或多或少分配了特定数量的锁,然后根据它们的哈希码将字符串分配给锁。API看起来或多或少像

Striped<Lock> locks = Striped.lock(stripes);
Lock l = locks.get(string);
l.lock();
try {
  // do stuff 
} finally {
  l.unlock();
}

或多或少,可控制的条带数量让您可以用内存使用来交换并发性,因为为每个字符串键分配一个完整的锁可能会很昂贵;本质上,只有在遇到哈希冲突时才会出现锁争用,而哈希冲突(可以预见)很少见。

于 2017-07-28T11:29:12.920 回答
0

我使用缓存来存储锁定对象。我的缓存将在一段时间后过期对象,这实际上只需要比同步进程运行所需的时间更长

`

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

...

private final Cache<String, Object> mediapackageLockCache = CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();

...

public void doSomething(foo) {
    Object lock = mediapackageLockCache.getIfPresent(foo.toSting());
    if (lock == null) {
        lock = new Object();
        mediapackageLockCache.put(foo.toString(), lock);
    }

    synchronized(lock) {
        // execute code on foo
        ...
    }
}

`

于 2016-10-06T08:59:36.110 回答
0

我有一个更简单、可扩展的实现,类似于@timmons post 利用 guava LoadingCachewith weakValues. 您将需要阅读有关“平等”的帮助文件以了解我提出的建议。

定义以下 weakValued 缓存。

private final LoadingCache<String,String> syncStrings = CacheBuilder.newBuilder().weakValues().build(new CacheLoader<String, String>() {
    public String load(String x) throws ExecutionException {
        return new String(x);
    }
});

public void doSomething(String x) {
      x = syncStrings.get(x);
      synchronized(x) {
          ..... // whatever it is you want to do
      }
}

现在!作为 JVM 的结果,我们不必担心缓存会变得太大,它只会在必要时保存缓存的字符串,而垃圾管理器/guava 会完成繁重的工作。

于 2017-10-21T06:49:09.397 回答