3

我正在尝试实现一个资源处理程序类,该类将资源(字符串,存储在数组中)分配给多个客户端,这些客户端可以尝试获取一组资源的锁并通过 lock 方法给出的 ID 解锁它们。

我正在尝试使用公平的 ReentrantReadWriteLock-s,每个资源一个。

我只看到客户端的日志。

有几个问题,有时一个线程不会停止请求和获取资源,有时会发生死锁,有时释放锁失败。任何提示表示赞赏。

public class ResHandler {

//ID-s of the granted resource lists
private static long lockNum = 0;

//Resources are identified by strings, each client has a list of demanded resources
//we store these when granted, along with an ID
private static ConcurrentHashMap<Long, Set<String>> usedResources 
    = new ConcurrentHashMap<Long, Set<String>>();

//We store a lock for each resource
private static ConcurrentHashMap<String, ReentrantReadWriteLock> resources 
    = new ConcurrentHashMap<String, ReentrantReadWriteLock>();

//Filling our resources map with the resources and their locks
static {
    for (int i = 0; i < SharedValues.RESOURCE_LIST.length; ++i) {
        String res = SharedValues.RESOURCE_LIST[i];
        //Fair reentrant lock
        ReentrantReadWriteLock lc = new ReentrantReadWriteLock(true);
        resources.put(res, lc);
    }
}

//We get a set of the required resources and the type of lock we have to use
public static long getLock(Set<String> mNeededRes, boolean mMethod) {
    //!!!
    if (mMethod == SharedValues.READ_METHOD) {

        //We try to get the required resources
        for (String mn : mNeededRes)
            resources.get(mn).readLock().lock();

        //After grandted, we put them in the usedResources map
        ++lockNum;
        usedResources.put(lockNum, mNeededRes);
        return lockNum;         
    }

    //Same thing, but with write locks
    else {

        for (String mn : mNeededRes)
            resources.get(mn).writeLock().lock();

        ++lockNum;
        usedResources.put(lockNum, mNeededRes);
        return lockNum;         
    }
}

//Releasing a set of locks by the set's ID
public static void releaseLock(long mLockID) {
    if (!usedResources.containsKey(mLockID)) {
        System.out.println("returned, no such key as: " + mLockID);
        return; 
    }

    Set<String> toBeReleased = usedResources.get(mLockID);

    //Unlocking every lock from this set
    for (String s : toBeReleased) {
        if (resources.get(s).isWriteLockedByCurrentThread())
            resources.get(s).writeLock().unlock();
        else 
            resources.get(s).readLock().unlock();
    }

    //Deleting from the map
    usedResources.remove(mLockID);
}   
}
4

4 回答 4

2

您的程序中有几个问题是导致锁定和错误的原因:

  • 一般来说:将全局变量声明为final。你不想不小心惹到他们。此外,这允许您将它们用作同步对象。

  • long 不保证是原子的,运算符 ++ 也不是。32 位 JVM 必须分 2 步编写它,因此理论上可能会导致系统出现重大故障。最好使用 AtomicLong。

  • getLock 不是线程安全的。例子:

线程 A 为资源 1、3、5
调用 getLock 线程 B 同时为资源 2、5、3 调用 getLock
线程 A 在 1、3 上被授予锁定,然后暂停
线程 B 在 2 上被授予锁定,然后将 5 置于暂停状态
线程 A 现在等待来自线程 B 的 5,而线程 B 现在等待来自线程 A 的 3。
死锁。

请注意,释放方法不需要同步,因为它不能锁定任何其他线程。

  • ++lockNum如果同时调用,将导致两个线程弄乱它们的锁值,因为这是一个全局变量。

这是处于工作状态的代码:

  private static final AtomicLong lockNum = new AtomicLong(0);
  private static final ConcurrentHashMap<Long, Set<String>> usedResources = new ConcurrentHashMap<Long, Set<String>>();
  private static final ConcurrentHashMap<String, ReentrantReadWriteLock> resources = new ConcurrentHashMap<String, ReentrantReadWriteLock>();

  static {
    for (int i = 0; i < SharedValues.RESOURCE_LIST.length; ++i) {
      String res = SharedValues.RESOURCE_LIST[i];
      ReentrantReadWriteLock lc = new ReentrantReadWriteLock(true);
      resources.put(res, lc);
    }
  }

  public static long getLock(Set<String> mNeededRes, boolean mMethod) {
    synchronized (resources) {
      if (mMethod == SharedValues.READ_METHOD) {
        for (String mn : mNeededRes) {
          resources.get(mn).readLock().lock();
        }
      } else {
        for (String mn : mNeededRes) {
          resources.get(mn).writeLock().lock();
        }
      }
    }
    final long lockNumber = lockNum.getAndIncrement();
    usedResources.put(lockNumber, mNeededRes);
    return lockNumber;
  }

  public static void releaseLock(final long mLockID) {
    if (!usedResources.containsKey(mLockID)) {
      System.out.println("returned, no such key as: " + mLockID);
      return;
    }

    final Set<String> toBeReleased = usedResources.remove(mLockID);

    for (String s : toBeReleased) {
      final ReentrantReadWriteLock lock = resources.get(s);
      if (lock.isWriteLockedByCurrentThread()) {
        lock.writeLock().unlock();
      } else {
        lock.readLock().unlock();
      }
    }
  }
于 2013-10-13T20:05:49.513 回答
0

我假设不同的客户端可以从不同的线程调用 getLock。如果是这样,那么第一个问题是对 lockNum 的访问不同步。两个线程可能同时调用 getLock,因此根据时间的不同,它们可能最终都返回相同的锁号。这可以解释为什么释放锁有时会失败。

如果你能解决这个问题,应该更容易弄清楚还有什么问题。

于 2013-10-12T20:24:23.993 回答
0

为了避免死锁,您的资源必须以相同的顺序获取,因此您必须Set<String> mNeededRes在循环执行锁定之前进行排序。排序方法并不重要。

这在第 10 章中有详细描述。Java Concurrency In Practice Brian Göetz避免活体危害

我建议您删除getLockreleaseLock/或将它们设为私有。并将所有逻辑包装到Runnable. 如果您控制所有锁,则无法忘记释放它们。做这样的事情:

public void performMethod(List<String> mNeededRes, boolean mMethod, Runnable r){
    List sortd = Collections.sort(mNeededRes);
    try{
        getLock(mNeededRes, mMethod);
        r.run();
    }finally {
        releaseLock(mNeededRes);
    }
}
于 2013-10-13T16:45:57.013 回答
0

更新的解决方案,试一试:

public class ResHandler {

private static AtomicLong lockNum = new AtomicLong(0);
private static Map<Long, Set<String>> usedResources = new ConcurrentHashMap<Long, Set<String>>();
private static final Map<String, ReentrantReadWriteLock> resources = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
// "priorityResources" to avoid deadlocks and starvation
private static final Map<String, PriorityBlockingQueue<Long>> priorityResources = new ConcurrentHashMap<String, PriorityBlockingQueue<Long>>();

static {
    for (int i = 0; i < SharedValues.RESOURCE_LIST.length; ++i) {
        String res = SharedValues.RESOURCE_LIST[i];
        ReentrantReadWriteLock lc = new ReentrantReadWriteLock(true);
        resources.put(res, lc);
        priorityResources.put(res, new PriorityBlockingQueue<Long>());
    }
}

public static long getLock(Set<String> mNeededRes, boolean mMethod) {
    long lockNumLocal = lockNum.addAndGet(1);
    for (String mn : mNeededRes) {
        priorityResources.get(mn).offer(lockNumLocal);
    }
    boolean tryLockResult;
    List<String> lockedList = new ArrayList<String>();
    boolean allLocked = false;
    while (!allLocked) {
        allLocked = true;
        for (String mn : mNeededRes) {
            if (lockedList.contains(mn) == true) {
                continue;//because we already have the lock
            }
            try {
                if (mMethod == SharedValues.READ_METHOD) {
                    tryLockResult = resources.get(mn).readLock().tryLock(1, TimeUnit.MILLISECONDS);
                } else {
                    tryLockResult = resources.get(mn).writeLock().tryLock(1, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException ex) {
                Logger.getLogger(ResHandler.class.getName()).log(Level.SEVERE, null, ex);
                tryLockResult = false;
            }

            if (tryLockResult) {
                lockedList.add(mn);
            } else {
                allLocked = false;
                for (int i = lockedList.size() - 1; i >= 0; i--) {
                    //if the lock failed, all previous locked resources need to be released, but only if they will be used by higher priority lock operations
                    if (priorityResources.get(lockedList.get(i)).peek() != lockNumLocal) {
                        if (mMethod == SharedValues.READ_METHOD) {
                            resources.get(lockedList.get(i)).readLock().unlock();
                        } else {
                            resources.get(lockedList.get(i)).writeLock().unlock();
                        }
                        lockedList.remove(i);
                    }
                }
                break;
            }
        }
    }
    usedResources.put(lockNumLocal, mNeededRes);
    for (String mn : mNeededRes) {
        priorityResources.get(mn).remove(lockNumLocal);
    }
    return lockNumLocal;
}

public static void releaseLock(long mLockID) {
    if (!usedResources.containsKey(mLockID)) {
        System.out.println("returned, no such key as: " + mLockID);
        return;
    }

    Set<String> toBeReleased = usedResources.get(mLockID);

    //Unlocking every lock from this set
    for (String s : toBeReleased) {
        if (resources.get(s).isWriteLockedByCurrentThread()) {
            resources.get(s).writeLock().unlock();
        } else {
            resources.get(s).readLock().unlock();
        }
    }

    //Deleting from the map
    usedResources.remove(mLockID);
}

}

于 2013-10-11T23:35:22.227 回答