2

当我尝试学习 Java 线程时,我通常会遇到同一个类中的代码示例wait()notify()实际上几乎所有这些都是生产者-消费者示例)。在谷歌搜索各种示例后,不幸的是我找不到我需要的案例:

  • 管理器线程最初创建 n 个线程(并启动它们),其中 http get 请求在单个线程中完成。
  • 对于单个工作线程,完成其生命周期大约需要 20-30 秒。
  • 在这里,我的经理线程必须知道哪些工人已经完成,并用新的替换完成线程。

我想过这样的方法(让 n 为 5):

List<Runnable> runnables = new ArrayList<Runnable>();
for(int i = 0 ; i < 5 ; i++){
    runnables.add(new MyWorker(params));
}
for(Runnable myWorker : runnables){
   myWorker.run();
}

由于wait()不支持多个对象,我不能从这里继续。另一种解决方案是在为每个工作人员调用一些 isFinished 标志的管理器线程上实现繁忙等待。但我不确定这是否是一个好方法(据我所知这是资源浪费)

4

2 回答 2

3

Java 6 及更高版本有一个即用型工具,形式为执行器框架。

由于您希望拥有固定的线程池,因此最好的选择是使用:

ExecutorService service = Executors.newFixedThreadPool(5);

然后,您可以使用该方法(由接口定义)提交您的Runnable实例。一旦池中的线程可用,它们将被提交到工作队列并出列。.execute()Executor

如果您的各个线程工作方法返回一个值(即它们实现Callable<Something>),那么您可以使用该.submit()方法(由 定义ExecutorService,也是一个扩展的接口Executor),它将返回一个Future<Something>您将从.get()中计算的值。

终止线程池有多种方式:.shutdown()是最基本的,将同步等待仍然活动的线程终止(并阻止提交新作业)。

Javadocs:ExecutorsExecutorServiceThreadPoolExecutor

其他链接:一本你应该为所有与线程相关的东西买的书(但不包括 Java 7 ForkJoinPool)。

PS:多么幸运,上面本书的示例章节(PDF)涵盖了任务执行;)

于 2013-01-01T22:47:50.413 回答
1

您可以使用semaphorewait/notify方法来完成您正在尝试做的事情。方法是:

  1. semaphore一次允许的最大线程数初始化。
  2. 等到任务可用queue
  3. 获取一个semaphore.
  4. 运行任务并在完成时释放semaphore.

将所有四个步骤都放在永远的 while 循环中,您就可以使用任务执行器了。这仅用于学习目的,正如@fge 所说,已经存在ThreadPoolExecuter可以为您做同样事情并且更加优化的东西。

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;

public class ThreadExecuter extends Thread {

    Queue<Runnable> tasks = new LinkedList<Runnable>();
    Semaphore s;
    Object bin = new Object();

    public ThreadExecuter(int n) {
        s = new Semaphore(n);
    }

    public void addTask(Runnable r) {
        synchronized (bin) {
            tasks.add(r);
            bin.notifyAll();
        }
    }

    public void run() {
        while (true) {
            try {
                final Runnable r;
                synchronized (bin) {
                    while (tasks.isEmpty())
                        bin.wait();
                    r = tasks.poll();
                }

                s.acquire();
                new Thread() {
                    public void run() {
                        r.run();
                        s.release();
                    }
                }.start();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }
}

主要方法如下所示:

import java.util.Random;

public class ThreadTest {

    /**
     * @param args
     */
    public static void main(String[] args) {
            /* to make maximum 10 threads each running 1 runnable */
        ThreadExecuter executer = new ThreadExecuter(10);
        executer.start();

        for(int i = 0; i < 5000; i++) {
                    /* add task in executer, this is non blocking */
            executer.addTask(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Task Executed in " 
                                        + Thread.currentThread().getId());
                    try {
                        Thread.sleep(new Random().nextInt(8000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

}
于 2013-01-01T22:50:59.710 回答