3

所以这段代码:

int usedPermits = totalPermits - semaphore.availablePermits();
semaphore.release(usedPermits);

不是线程安全的,因为如果在两行之间另一个线程释放了一个许可,信号量的容量实际上会增加到其原始最大值之上。

这适用于我的情况,因为这条代码是 1) 单线程和 2) 释放许可证的唯一位置,这可能只是说明“全部释放”和“获取/释放”是两种不兼容的设计模式的事实在同一个物体上。

但是,我想问一下是否存在具有不那么微妙的线程同步策略的首选模式。

4

4 回答 4

3

正如其他答案中所解释的,有一种解决方案可以使您的代码具有原子性。但是,不能保证它会在一般设置中解决问题,因为此代码不可能在所有情况下都是正确的。

许可证要么由使用它们的活动释放,在这种情况下代码是多余的:信号量将自然补充,或者它们不会,在这种情况下你将需要那段代码,它是安全的,只要你不会用它做任何奇怪的事情。

请注意,您声明您希望限制每个时间段(此处为一分钟)的活动速率,但您必须考虑到活动可能持续超过一分钟。您可以在此处限制两种不同的事情:

  • 每个量程开始的活动数量,
  • 在一个量程中运行的活动数量

如果您希望限制第一个,那么您将需要您的代码来重新填写许可证,并让活动保留他们的许可证。如果要处理第二种情况,则必须强制活动在启动和终止时分别获取和释放其许可。

如果您担心某些活动滥用信号量,请禁止在活动代码本身中使用它。实际上,速率限制与活动语义完全正交,最好将该功能与活动主代码分开。因此,您应该使用代码包装任何计划的活动来处理信号量:

class RateLimitedRunnable implements Runnable {
    Runnable runnable;
    RateLimitedRunnable(Runnable r) { runnable = r; }
    void Run() {
        semaphore.acquire();
        runnable.run();
        semaphore.release(); // remove if only limiting starts
    }
}

上面的示例(未经测试)代码描述了远离实际活动的信号量使用的可能处理,从而消除了任何潜在的误用。如果内部活动需要访问信号量,它应该只检索其当前状态,并且可以设计一个 ad-hoc 接口来提供这种有限的访问。

注意:我在这里使用“活动”一词作为线程或进程的意思,因为关于信号量使用的讨论比Java的上下文更普遍。

于 2013-02-20T14:03:55.093 回答
0

将某些东西混入 Semaphore 有点奇怪,如果它甚至可行的话。它还会导致突发性 - 如果您限制为 10k QPS,每秒,您将重置并一次获得 10000 个查询。为什么不使用现有的 RateLimiter 实现?:

http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/RateLimiter.html

或者至少看一下代码以获得灵感:

http://code.google.com/p/guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/RateLimiter.java?name=v13.0-rc2

于 2013-02-20T02:02:42.937 回答
0

为什么不扩展标准的 Semaphore 类并覆盖它的方法,但让它们同步:

public class Semaphore extends java.util.concurrent.Semaphore {
public Semaphore(int permits)
{
    super(permits);
}

public synchronized void releaseAll()
{
    super.release(super.drainPermits());
}   
}
于 2013-09-03T17:46:31.513 回答
0

不是线程安全的,因为如果在两行之间另一个线程释放了一个许可,信号量的容量实际上会增加到其原始最大值之上。

这应该通过 解决synchronization,还强制每个都semaphore.release()在一个synchronized块中,例如

synchronize(lock) {
    int usedPermits = totalPermits - semaphore.availablePermits();
    semaphore.release(usedPermits);
}

但是,如果您想确保许可的数量超过,这可能还不够,totalPermits因为即使使用synchronized release all块,如果线程调用releaseafterwordssemaphore将超过totalPermits

您可以实现一个threads调用释放许可而不semaphore.release()直接调用的函数,例如

void limitedRelease(){
  synchronize(lock) {
    if(semaphore.availablePermits() < totalPermits ) {
      semaphore.release();
    }    
  }  
}
于 2013-02-19T23:36:49.310 回答