3

更新:我使用的是 Java 1.6.34,没有机会升级到 Java 7。

我有一个场景,我每分钟只能调用一个方法 80 次。它实际上是由第 3 方编写的服务 API,如果您调用它太多次,它会“关闭”(忽略调用)它的 API:

public class WidgetService {
    // Can only call this method 80x/min, otherwise it
    // it just doesn't do anything
    public void doSomething(Fizz fizz);
}

我想写一个ApiThrottler类,它有一个boolean canRun()方法可以告诉我的Java客户端是否doSomething(Fizz)可以调用该方法。(当然它总是可以调用的,但是如果我们超过了我们的速率,调用它就没有意义了。)

因此,可以让我编写如下代码:

// 80x/min
ApiThrottler throttler = new ApiThrottler(80);

WidgetService widgetService = new DefaultWidgetService();

// Massive list of Fizzes
List<Fizz> fizzes = getFizzes();

for(Fizz fizz : fizzes)
    if(throttler.canRun())
        widgetService.doSomething(fizz);

这不一定是 API ( ApiThrottler#canRun),但是我需要一个可靠/可靠的机制,它会暂停/休眠直到WidgetService#doSomething(Fizz) 可以调用。

这让我觉得我们正在进入使用多线程的领域,这让我觉得我们可以使用某种锁定机制和 Java 通知 ( wait()/ notify()) 模型。但是在这个领域没有经验,我似乎无法理解最优雅的解决方案。提前致谢。

4

4 回答 4

5

最好的选择之一可能是使用 Semaphore http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html类并每分钟给它 80 个许可。例如,这可以通过使用计时器类http://docs.oracle.com/javase/7/docs/api/java/util/Timer.html来完成。调用者线程将在每次通过调用信号量上的 acquire() 执行对服务的调用时消耗许可,如果所有许可都已经耗尽,这将阻塞。

正如您所提到的,当然可以使用等待/通知和带有计时器或单独线程的整数计数器来编写代码,但是与我概述的更现代的 java.util.concurrent API 的使用相比,这将更加复杂多于。

它可以看起来接近以下内容:

class Throttler implements TimerTask {
  final Semaphore s = new Semaphore(80);  
  final Timer timer = new Timer(true);

  Throttler() {
    timer.schedule(this, 0, 60*1000);   //schedule this for 1 min execution  
  }

  run() {  //called by timer 
    s.release(80 - s.availablePermits());
  }

  makeCall() {
    s.acquire();
    doMakeCall();
  }

}

这应该从 Java 5 开始工作。

同样更好的解决方案是使用com.google.common.util.concurrent.RateLimiterGuava。它看起来像这样:

class Throttler {
  final RateLimiter rateLimiter = RateLimiter.create(80.0/60.0);

  makeCall() {
    rateLimiter.acquire();
    doMakeCall();
  }
}

与 Semaphore 解决方案相比,语义略有不同,RateLimiter 很可能更适合您的情况。

于 2013-02-01T01:42:59.777 回答
1

我最近写了类似的东西。唯一的变化是我的代码在函数运行完成时需要回调。所以如果它不能运行,我直接调用回调。

另一个变化是,由于此调用可能是异步的,因此当您第二次调用它时,该调用可能正在进行中。在这种情况下,我只是忽略了电话。

我的节流器有一个辅助函数call,它接受要调用的函数和回调。这是用 C++ 编写的,所以对你来说,这将是 Action 和 listener 接口的形式。

与基于信号量的解决方案相比,这具有序列化请求的优势,因此您也不会过于频繁地调用它

interface Callback{
    public void OnFunctionCalled();
}

class APIThrottler
   //ctor etc
   boolean CanCall();

   public boolean IsInProgress();

   public void SetInProgress(boolean inProgress = true);

   public void Mark(){/*increment counter*/; SetInProgress(false);} // couldnt think of a better name..

   public void Call(Callable event, Callback callback){
        If(IsInProgress())
            return;
        else if(CanCall())
        {
            SetInProgress();
            event.Call();
        }
        else
            callback.OnFunctionCalled();

    }
}

一旦函数回调(或者如果它是同步的,则在函数本身内)Mark()一旦完成,您就需要这样做。

这在很大程度上是我的实现,唯一的区别是我在 x 秒(或几分钟)内处理一次。

于 2013-02-01T01:43:02.783 回答
0

这可以通过使用一个简单的计数器来实现。我们称它为初始化为 80 的“许可”计数器。在调用“函数”之前,首先尝试获取许可。完成后,释放它。然后设置一个重复计时器,将计数器重置为每秒 80。

class Permit extends TimerTask
{
    private Integer permit = 80;

    private synchronized void resetPermit() { this.permit = 80; }

    public synchronized boolean acquire() {
        if(this.permit == 0) return false;
        this.permit--;
        return true;
    }

    public synchronized void release() { this.permit++; }

    @Override public void run() { resetPermit(); }
}

像这样设置计时器以每秒重置一次许可。schedule 方法的第一个参数是 TimerTask 的一个实例(传递上面的 Permit 类对象)。run() 方法将在第二个参数中指定的每个时间段(此处为 1000 毫秒/1 秒)中调用。'true' 参数表示这是一个将不断重复的守护程序计时器。

Timer timer = new Timer(true);
timer.schedule(permit, 1000);

然后每次你需要调用你的函数时,首先检查你是否可以获得任何许可。完成后不要忘记释放它

void myFunction() {
  if(!permit.acquire()) {
      System.out.println("Nah.. been calling this 80x in the past 1 sec");
      return;
  }

  // do stuff here

  permit.release();
}

注意上面 Permit 类方法中 synchronized 关键字的使用——这应该避免超过 1 个线程同时执行同一对象实例的任何方法

于 2013-02-01T02:07:51.170 回答
0

您可以保留时间记录,并确保最后一分钟不超过 80 条记录。

// first=newest, last=oldest
final LinkedList<Long> records = new LinkedList<>();

synchronized public void canRun() throws InterruptedException
{
    while(true)
    {
        long now = System.currentTimeMillis();
        long oneMinuteAgo = now - 60000;

        // remove records older than one minute
        while(records.getLast() < oneMinuteAgo)
            records.removeLast();

        if(records.size()<80) // less than 80 records in the last minute
        {
            records.addFirst(now);
            return;   // can run
        }

        // wait for the oldest record to expire, then check again
        wait( 1 + records.getLast() - oneMinuteAgo);
    }
}

有可能在第 0 秒,我们发出 80 个调用,等待一分钟,然后在第 60 秒,我们又发出 80 个调用。由于两端时钟不精确或网络随机延迟,另一端可能在一分钟内测量到 160 个呼叫。为安全起见,请扩大时间窗口,例如以每 70 秒 80 次调用进行节流。

于 2013-02-01T02:59:55.890 回答