我正在寻找实现以下内容的最简单、最直接的方法:
- 主程序实例化工作线程来执行任务。
- 只有
n
任务可以同时运行。 - 达到时
n
,将不再启动任何工作线程,直到运行线程数回落到n
.
我正在寻找实现以下内容的最简单、最直接的方法:
n
任务可以同时运行。n
,将不再启动任何工作线程,直到运行线程数回落到n
.我认为Executors.newFixedThreadPool符合您的要求。有许多不同的方法可以使用生成的 ExecutorService,这取决于您是否希望将结果返回到主线程,或者任务是否完全独立,以及您是否有一组任务要预先执行,或者任务是否排队以响应某些事件。
Collection<YourTask> tasks = new ArrayList<YourTask>();
YourTask yt1 = new YourTask();
...
tasks.add(yt1);
...
ExecutorService exec = Executors.newFixedThreadPool(5);
List<Future<YourResultType>> results = exec.invokeAll(tasks);
或者,如果您有一个新的异步任务要执行以响应某个事件,您可能只想使用 ExecutorService 的简单execute(Runnable)
方法。
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
/* ...execute the task to run concurrently as a runnable: */
exec.execute(new Runnable() {
public void run() {
/* do the work to be done in its own thread */
System.out.println("Running in: " + Thread.currentThread());
}
});
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
/* The tasks are now running concurrently. We wait until all work is done,
* with a timeout of 50 seconds: */
boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
/* If the execution timed out, false is returned: */
System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }
Executors.newFixedThreadPool(int)
Executor executor = Executors.newFixedThreadPool(n);
Runnable runnable = new Runnable() {
public void run() {
// do your thing here
}
}
executor.execute(runnable);
使用 Executor 框架;即newFixedThreadPool(N)
如果您的任务队列不是无限的并且任务可以在更短的时间间隔内完成,您可以使用Executors.newFixedThreadPool(n)
; 正如专家建议的那样。
此解决方案的唯一缺点是任务队列大小不受限制。你无法控制它。任务队列中的大量堆积会降低应用程序的性能,并且在某些情况下可能会导致内存不足。
如果您想使用ExecutorService
和启用work stealing
空闲工作线程通过窃取任务队列中的任务来分担繁忙工作线程的工作负载的机制。它将返回 ForkJoinPool 类型的 Executor Service。
公共静态
ExecutorService newWorkStealingPool
(int 并行性)创建一个线程池来维护足够的线程来支持给定的并行级别,并且可以使用多个队列来减少争用。并行度级别对应于主动参与或可参与任务处理的最大线程数。实际线程数可能会动态增长和收缩。工作窃取池不保证提交任务的执行顺序。
我更喜欢ThreadPoolExecutor
API 的灵活性来控制许多参数,这些参数控制流任务的执行。
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
在您的情况下,同时设置corePoolSize and maximumPoolSize as N
. 在这里,您可以控制任务队列大小,定义自己的自定义线程工厂和拒绝处理程序策略。
查看相关的 SE 问题以动态控制池大小:
如果你想自己动手:
private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);
private boolean roomLeft() {
synchronized (workers) {
return (workers.size() < MAX_WORKERS);
}
}
private void addWorker() {
synchronized (workers) {
workers.add(new Worker(this));
}
}
public void removeWorker(Worker worker) {
synchronized (workers) {
workers.remove(worker);
}
}
public Example() {
while (true) {
if (roomLeft()) {
addWorker();
}
}
}
Worker 是扩展 Thread 的类。每个工人将调用这个类的 removeWorker 方法,将自己作为参数传入,当它完成它的事情时。
话虽如此,Executor 框架看起来好多了。
编辑:任何人都愿意解释为什么这很糟糕,而不是仅仅降低它?
正如其他人在这里提到的那样,最好的办法是使用Executors类创建一个线程池:
但是,如果你想自己动手,这段代码应该让你知道如何继续。基本上,只需将每个新线程添加到线程组,并确保组中的活动线程永远不会超过 N:
Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
if( group.activeCount()<N && i<tasks.length ) {
new TaskThread(group, tasks[i]).start();
i++;
} else {
Thread.sleep(100);
}
}