如何使用和的组合ScheduledThreadPoolExecutor
来限制接受可变参数的命令?在收到命令的响应后,我需要根据上述命令的输出创建一个新命令。我还需要遵守每秒 100 个调用的阈值。ScheduledFuture
ExecutorCompletionService
Callable
Callable
Callable
Callable
4 回答
您应该实现漏桶算法。在拨打电话之前,请阻止,直到您有令牌。你可以用几十行 Java 来实现这个算法。
我建议使用代理,例如 RabbitMQ。您可以将消费者的最大数量设置为 100,并拥有一个以每秒 100 条消息的速率发布的 Producer 实例。
在这里,您可以找到在分布式系统中实现节流机制的三种方法的解释。您将有兴趣使用的是使用 RabbitMQ 的分布式。这个旨在限制任何给定时间的并发消息数量,假设在任何给定时间最多 100 条。您需要对其进行修改,以使发布者每秒发布的消息不超过 100 条。在底部,您可以找到带有源代码的 git 存储库的 url,但无论如何,我也将其粘贴在这里。
从评论编辑:
首先使用 java.util.Semaphore,它配置了它将处理的许可数量。每个线程都将尝试获取许可,如果没有许可,则将被阻塞,直到释放一个。第二个使用固定大小的 ThreadPoolExecutor。执行程序在任何给定时间最多将具有指定数量的工作线程。第三个使用 RabbitMQ。并发消费者的最大数量将是工作线程的最大数量。git repo 有更详细的英文解释。希望这可以帮助
If you like re-using code well written and working, you can use the TimedSemaphore
which is part of the commons library of Apache (org.apache.commons.lang3.concurrent.TimedSemaphore
).
For instance if you need to limit to 80 calls in 10 seconds:
private TimedSemaphore sem = new TimedSemaphore(10, TimeUnit.SECONDS, 80);
[...]
public void myMethod() throws InterruptedException {
sem.acquire();
// requests which need throttling
}
您可以使用信号量进行节流。您需要区分您是 A)“每瞬间节流”(同时作业的上限)还是 B)“每间隔节流”(一个间隔内作业的上限)
A)向上和向下计数信号量对于“每瞬间的油门”就足够了。例如
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
public class ThrottlePerPerInstantSample {
private static final int JOBS_COUNT = 100;
private static final int JOBS_THROTTLE_PER_INSTANT = 10;
private static final Semaphore THROTTLE_PER_INSTANT_SEMAPHORE = new Semaphore(
JOBS_THROTTLE_PER_INSTANT);
private static final ExecutorService executorService = Executors
.newFixedThreadPool(JOBS_THROTTLE_PER_INSTANT);
private final static AtomicInteger jobsAtTheSameTimeCounter = new AtomicInteger(
0);
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= JOBS_COUNT; i++) {
THROTTLE_PER_INSTANT_SEMAPHORE.acquire();
final PrintJob printJob = new PrintJob(i, jobsAtTheSameTimeCounter);
final ThrottledJob throttledJob = new ThrottledJob(printJob,
THROTTLE_PER_INSTANT_SEMAPHORE);
executorService.execute(throttledJob);
}
executorService.shutdown();
}
static class ThrottledJob implements Runnable {
private final Runnable delegate;
private final Semaphore throttlePerInstantSemaphore;
public ThrottledJob(Runnable delegate,
Semaphore throttlePerInstantSemaphore) {
super();
this.delegate = delegate;
this.throttlePerInstantSemaphore = throttlePerInstantSemaphore;
}
@Override
public void run() {
try {
delegate.run();
} finally {
throttlePerInstantSemaphore.release();
}
}
}
static class PrintJob implements Runnable {
final int jobNumber;
final AtomicInteger jobsAtTheSameTimeCounter;
public PrintJob(int jobNumber, AtomicInteger jobsAtTheSameTimeCounter) {
super();
this.jobNumber = jobNumber;
this.jobsAtTheSameTimeCounter = jobsAtTheSameTimeCounter;
}
public void run() {
jobsAtTheSameTimeCounter.incrementAndGet();
try {
Thread.sleep(50); // wait some time
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (System.out) {
System.out.println(jobsAtTheSameTimeCounter.getAndDecrement()
+ " : Job " + jobNumber);
}
}
}
}
输出可以是:
10 : Job 1
9 : Job 2
8 : Job 3
7 : Job 5
7 : Job 4
...
9 : Job 87
10 : Job 90
9 : Job 89
10 : Job 91
9 : Job 92
8 : Job 93
7 : Job 94
6 : Job 95
5 : Job 98
4 : Job 97
3 : Job 96
2 : Job 100
1 : Job 99
B)对信号量进行计数并定期将信号量重置为其初始值对于“每个间隔的节流”就足够了。
例如
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class ThrottlePerIntervallSample {
private static final int JOBS_COUNT = 100;
private static final int JOBS_THROTTLE_PER_INTERVALL = 10;
private static final long INTERVALL_IN_UNITS = 1;
private static final TimeUnit UNIT_OF_INTERVALL = TimeUnit.SECONDS;
private static final Semaphore THROTTLE_PER_INTERVALL_SEMAPHORE = new Semaphore(
JOBS_THROTTLE_PER_INTERVALL);
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
.newScheduledThreadPool(JOBS_THROTTLE_PER_INTERVALL + 1);
// plus one because the resetting of the semaphore must be possible!
public static void main(String[] args) throws InterruptedException {
SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(()-> {
THROTTLE_PER_INTERVALL_SEMAPHORE.drainPermits(); // remove permits from previous intervall
THROTTLE_PER_INTERVALL_SEMAPHORE.release(JOBS_THROTTLE_PER_INTERVALL); // set permits for the next intervall
}, INTERVALL_IN_UNITS, INTERVALL_IN_UNITS, UNIT_OF_INTERVALL);
for (int i = 1; i <= JOBS_COUNT; i++) {
THROTTLE_PER_INTERVALL_SEMAPHORE.acquire();
final PrintJob printJob = new PrintJob(i);
SCHEDULED_EXECUTOR_SERVICE.execute(printJob);
}
SCHEDULED_EXECUTOR_SERVICE.shutdown();
}
static class PrintJob implements Runnable {
final int jobNumber;
public PrintJob(int jobNumber) {
super();
this.jobNumber = jobNumber;
}
public void run() {
try {
Thread.sleep(50); // wait some time
} catch (InterruptedException e) {
e.printStackTrace();
}
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss:SSS");
synchronized (System.out) {
System.out.println(simpleDateFormat.format(new Date())
+ " : Job " + jobNumber);
}
}
}
}
输出可以是:
00:42:29:253 : Job 9
00:42:29:255 : Job 2
00:42:29:255 : Job 6
00:42:29:255 : Job 5
00:42:29:255 : Job 10
00:42:29:256 : Job 7
00:42:29:256 : Job 3
00:42:29:256 : Job 1
00:42:29:256 : Job 8
00:42:29:257 : Job 4
00:42:30:140 : Job 11
...
00:42:37:142 : Job 90
00:42:38:140 : Job 91
00:42:38:140 : Job 92
00:42:38:141 : Job 99
00:42:38:141 : Job 93
00:42:38:141 : Job 94
00:42:38:142 : Job 98
00:42:38:142 : Job 96
00:42:38:142 : Job 95
00:42:38:143 : Job 100
00:42:38:143 : Job 97
一些备注:
1) 更好地使用方法 tryAcquire 超时而不是在生产系统中获取,以避免实际死锁!
2)如果您处理许多作业,请在将作业提交到(预定的)执行器服务之前调用 aquire/tryAquire。否则你可能会在那个时候用太多的作业污染线程池的队列。