我正在寻找一个 Java Executor,它允许我指定限制/吞吐量/步调限制,例如,一秒钟内最多可以处理 100 个任务——如果提交了更多任务,它们应该排队并稍后执行。这样做的主要目的是避免在访问外部 API 或服务器时遇到限制。
我想知道是基本Java(我怀疑,因为我检查过)还是其他可靠的地方(例如Apache Commons)提供了这个,或者我是否必须自己编写。最好是轻量级的。我不介意自己写,但如果有一个“标准”版本,我至少想先看看它。
我正在寻找一个 Java Executor,它允许我指定限制/吞吐量/步调限制,例如,一秒钟内最多可以处理 100 个任务——如果提交了更多任务,它们应该排队并稍后执行。这样做的主要目的是避免在访问外部 API 或服务器时遇到限制。
我想知道是基本Java(我怀疑,因为我检查过)还是其他可靠的地方(例如Apache Commons)提供了这个,或者我是否必须自己编写。最好是轻量级的。我不介意自己写,但如果有一个“标准”版本,我至少想先看看它。
看看番石榴RateLimiter:
限速器。从概念上讲,速率限制器以可配置的速率分配许可。如有必要,每个 acquire() 都会阻塞,直到获得许可,然后再接受它。获得许可后,无需发放许可证。速率限制器通常用于限制访问某些物理或逻辑资源的速率。这与限制并发访问数量而不是速率的信号量形成对比(请注意,并发和速率密切相关,例如参见利特尔定律)。
它的线程安全,但仍然@Beta
. 无论如何可能值得一试。
您必须Executor
针对速率限制器将每个调用包装起来。对于更干净的解决方案,您可以为ExecutorService
.
从javadoc:
final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // may wait
executor.execute(task);
}
}
Java Executor 没有提供这样的限制,只有线程数量的限制,这不是您想要的。
一般来说,Executor 无论如何都是限制此类操作的错误位置,它应该在 Thread 尝试调用外部服务器的那一刻。例如,您可以通过让线程在提交请求之前等待的限制信号量来做到这一点。
调用线程:
public void run() {
// ...
requestLimiter.acquire();
connection.send();
// ...
}
同时您安排一个(单个)辅助线程定期(如每 60 秒)释放获取的资源:
public void run() {
// ...
requestLimiter.drainPermits(); // make sure not more than max are released by draining the Semaphore empty
requestLimiter.release(MAX_NUM_REQUESTS);
// ...
}
一秒钟内最多可以处理 100 个任务——如果提交了更多任务,它们应该排队并稍后执行
你需要调查一下Executors.newFixedThreadPool(int limit)
。这将允许您限制可以同时执行的线程数。如果您提交多个线程,它们将被排队并稍后执行。
ExecutorService threadPool = Executors.newFixedThreadPool(100);
Future<?> result1 = threadPool.submit(runnable1);
Future<?> result2 = threadPool.submit(runnable2);
Futurte<SomeClass> result3 = threadPool.submit(callable1);
...
上面的片段显示了如何使用ExecutorService
允许同时执行不超过 100 个线程的线程。
更新:
查看评论后,这是我想出的(有点愚蠢)。手动跟踪要执行的线程怎么样?如何首先将它们存储在一个中,然后根据在最后一秒内已经执行了多少线程ArrayList
将它们提交给。Executor
因此,假设有 200 个任务已提交到我们维护ArrayList
的Executor
. 当第二次通过时,我们可以根据已经完成的线程数添加更多线程Executor
,依此类推
根据场景的不同,正如之前的回复之一所建议的那样,ThreadPoolExecutor 的基本功能可能会起到作用。
但是,如果线程池由多个客户端共享,并且您想要节流,以限制每个客户端的使用,确保一个客户端不会使用所有线程,那么 BoundedExecutor 将完成这项工作。
可以在以下示例中找到更多详细信息:
我个人觉得这个场景很有趣。就我而言,我想强调的是,节流的有趣阶段是消费端,就像经典的生产者/消费者并发理论中一样。这与之前的一些建议答案相反。也就是说,我们不想阻塞提交线程,而是基于速率(任务/秒)策略阻塞消费线程。因此,即使队列中有任务准备就绪,执行/消费线程也可能会阻塞等待满足节流策略。
也就是说,我认为 Executors.newScheduledThreadPool(int corePoolSize) 是一个不错的候选者。这样,您需要在执行程序前面有一个简单的队列(一个简单的 LinkedBlockingQueue 就可以了),然后安排一个周期性任务从队列中挑选实际任务(ScheduledExecutorService.scheduleAtFixedRate)。因此,这不是一个简单的解决方案,但如果您尝试像之前讨论的那样限制消费者,它应该会执行足够的 goog。
可以将其限制在 Runnable 内:
public static Runnable throttle (Runnable realRunner, long delay) {
Runnable throttleRunner = new Runnable() {
// whether is waiting to run
private boolean _isWaiting = false;
// target time to run realRunner
private long _timeToRun;
// specified delay time to wait
private long _delay = delay;
// Runnable that has the real task to run
private Runnable _realRunner = realRunner;
@Override
public void run() {
// current time
long now;
synchronized (this) {
// another thread is waiting, skip
if (_isWaiting) return;
now = System.currentTimeMillis();
// update time to run
// do not update it each time since
// you do not want to postpone it unlimited
_timeToRun = now+_delay;
// set waiting status
_isWaiting = true;
}
try {
Thread.sleep(_timeToRun-now);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// clear waiting status before run
_isWaiting = false;
// do the real task
_realRunner.run();
}
}};
return throttleRunner;
}