35

我正在寻找一个 Java Executor,它允许我指定限制/吞吐量/步调限制,例如,一秒钟内最多可以处理 100 个任务——如果提交了更多任务,它们应该排队并稍后执行。这样做的主要目的是避免在访问外部 API 或服务器时遇到限制。

我想知道是基本Java(我怀疑,因为我检查过)还是其他可靠的地方(例如Apache Commons)提供了这个,或者我是否必须自己编写。最好是轻量级的。我不介意自己写,但如果有一个“标准”版本,我至少想先看看它。

4

6 回答 6

39

看看番石榴RateLimiter

限速器。从概念上讲,速率限制器以可配置的速率分配许可。如有必要,每个 acquire() 都会阻塞,直到获得许可,然后再接受它。获得许可后,无需发放许可证。速率限制器通常用于限制访问某些物理或逻辑资源的速率。这与限制并发访问数量而不是速率的信号量形成对比(请注意,并发和速率密切相关,例如参见利特尔定律)。

它的线程安全,但仍然@Beta. 无论如何可能值得一试。

您必须Executor针对速率限制器将每个调用包装起来。对于更干净的解决方案,您可以为ExecutorService.

从javadoc:

 final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
  void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
      rateLimiter.acquire(); // may wait
      executor.execute(task);
    }
  }
于 2013-11-06T19:28:58.103 回答
8

Java Executor 没有提供这样的限制,只有线程数量的限制,这不是您想要的。

一般来说,Executor 无论如何都是限制此类操作的错误位置,它应该在 Thread 尝试调用外部服务器的那一刻。例如,您可以通过让线程在提交请求之前等待的限制信号量来做到这一点。

调用线程:

public void run() {
  // ...
  requestLimiter.acquire();
  connection.send();
  // ...
 }

同时您安排一个(单个)辅助线程定期(如每 60 秒)释放获取的资源:

 public void run() {
  // ...
  requestLimiter.drainPermits();  // make sure not more than max are released by draining the Semaphore empty
  requestLimiter.release(MAX_NUM_REQUESTS);
  // ...
 }
于 2013-11-06T18:42:37.367 回答
4

一秒钟内最多可以处理 100 个任务——如果提交了更多任务,它们应该排队并稍后执行

你需要调查一下Executors.newFixedThreadPool(int limit)。这将允许您限制可以同时执行的线程数。如果您提交多个线程,它们将被排队并稍后执行。

ExecutorService threadPool = Executors.newFixedThreadPool(100);
Future<?> result1 =  threadPool.submit(runnable1);
Future<?> result2 = threadPool.submit(runnable2);
Futurte<SomeClass> result3 = threadPool.submit(callable1);  
...  

上面的片段显示了如何使用ExecutorService允许同时执行不超过 100 个线程的线程。

更新:
查看评论后,这是我想出的(有点愚蠢)。手动跟踪要执行的线程怎么样?如何首先将它们存储在一个中,然后根据在最后一秒内已经执行了多少线程ArrayList将它们提交给。Executor
因此,假设有 200 个任务已提交到我们维护ArrayListExecutor. 当第二次通过时,我们可以根据已经完成的线程数添加更多线程Executor,依此类推

于 2013-11-06T18:30:27.987 回答
1

根据场景的不同,正如之前的回复之一所建议的那样,ThreadPoolExecutor 的基本功能可能会起到作用。

但是,如果线程池由多个客户端共享,并且您想要节流,以限制每个客户端的使用,确保一个客户端不会使用所有线程,那么 BoundedExecutor 将完成这项工作。

可以在以下示例中找到更多详细信息:

http://jcip.net/listings/BoundedExecutor.java

于 2016-06-28T16:00:26.530 回答
0

我个人觉得这个场景很有趣。就我而言,我想强调的是,节流的有趣阶段是消费端,就像经典的生产者/消费者并发理论中一样。这与之前的一些建议答案相反。也就是说,我们不想阻塞提交线程,而是基于速率(任务/秒)策略阻塞消费线程。因此,即使队列中有任务准备就绪,执行/消费线程也可能会阻塞等待满足节流策略。

也就是说,我认为 Executors.newScheduledThreadPool(int corePoolSize) 是一个不错的候选者。这样,您需要在执行程序前面有一个简单的队列(一个简单的 LinkedBlockingQueue 就可以了),然后安排一个周期性任务从队列中挑选实际任务(ScheduledExecutorService.scheduleAtFixedRate)。因此,这不是一个简单的解决方案,但如果您尝试像之前讨论的那样限制消费者,它应该会执行足够的 goog。

于 2015-10-15T16:16:02.833 回答
0

可以将其限制在 Runnable 内:

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

取自JAVA Thread Debounce and Throttle

于 2019-09-20T01:17:17.810 回答