1

请耐心等待,因为我在多线程编程方面并不是很精通......

我目前正在构建一个将 ThreadPool ExecutorService 用于各种可运行的系统。这很简单。但是,我正在考虑让可运行文件本身根据原始可运行文件中发生的情况产生额外的可运行文件的可能性(即,如果成功,则执行此操作,如果失败,则执行此操作等,因为某些任务必须在其他任务之前完成执行)。需要注意的是,不需要将这些任务的结果通知主线程,尽管它对于处理异常可能很方便,即,如果无法联系到外部服务并且所有线程都因此抛出异常,那么停止提交任务并定期检查外部服务,直到它恢复。这不是完全必要的,但它会很好。

即,提交任务A。任务A 做了一些事情。如果一切顺利,任务 A 将执行任务 B。如果某些事情不正常或抛出异常,则执行任务 C。每个子任务也可能有额外的任务,但只有几个层次。我宁愿在单个任务中做这样的事情,也不愿在单个任务中使用大而复杂的条件,因为这种方法可以提供更大的灵活性。

但是,我不确定这将如何影响线程池。我会假设从池中的线程内创建的任何其他线程都将存在于池之外,因为它们本身并未直接提交到池中。这是一个正确的假设吗?如果是这样,这可能是一个坏主意(好吧,如果不是,它可能无论如何都不是一个好主意),因为它可能会在原始线程完成时导致更多线程,并且在线程从较早的任务仍在进行(并且可能比其他任务持续时间更长)。

我还考虑将这些实现为 Callables,并将一个响应对象放在返回的 Future 中,然后根据响应将适当的 Callable 添加到线程池中。但是,这会将所有操作绑定到主线程,这似乎是一个不必要的瓶颈。我想我可以将一个 Runnable 放入池中,该池本身处理 Callable 和后续操作的执行,但是我得到的线程数是原来的两倍。

我是在正确的轨道上还是我完全偏离了轨道?

4

3 回答 3

0

有很多方法可以做你想做的事。您需要小心,不要最终创建太多线程。

以下是一个示例,您可以使用 ExecutorCompletionService 提高效率,或者您可以使用 Runnable 的。

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class ThreadsMakeThreads {

    public static void main(String[] args) {
        new ThreadsMakeThreads().start();
    }

    public void start() {
        //Create resources
        ExecutorService threadPool = Executors.newCachedThreadPool();
        Random random = new Random(System.currentTimeMillis());
        int numberOfThreads = 5;

        //Prepare threads
        ArrayList<Leader> leaders = new ArrayList<Leader>();
        for(int i=0; i < numberOfThreads; i++) {
            leaders.add(new Leader(threadPool, random));
        }

        //Get the results
        try {
            List<Future<Integer>> results = threadPool.invokeAll(leaders);
            for(Future<Integer> result : results) {
                System.out.println("Result is " + result.get());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        threadPool.shutdown();

    }

    class Leader implements Callable<Integer> {

        private ExecutorService threadPool;
        private Random random;

        public Leader(ExecutorService threadPool, Random random) {
            this.threadPool = threadPool;
            this.random = random;
        }

        @Override
        public Integer call() throws Exception {
            int numberOfWorkers = random.nextInt(10);
            ArrayList<Worker> workers = new ArrayList<Worker>();
            for(int i=0; i < numberOfWorkers; i++) {
                workers.add(new Worker(random));
            }
            List<Future<Integer>> tasks = threadPool.invokeAll(workers);
            int result = 0;
            for(Future<Integer> task : tasks) {
                result += task.get();
            }
            return result;
        }

    }

    class Worker implements Callable<Integer> {

        private Random random;

        public Worker(Random random) {
            this.random = random;
        }

        @Override
        public Integer call() throws Exception {
            return random.nextInt(100);
        }

    }
}
于 2013-08-06T20:36:28.170 回答
0

我从未使用过它,但它对您很有用:http: //docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

于 2013-08-06T18:18:35.850 回答
0

将其他任务中的任务提交到线程池中是非常有意义的想法。但恐怕你会想到在单独的线程上运行新任务,那真的会吃掉所有的内存。只需在创建池时设置线程数限制,然后向该线程池提交新任务。

这种方法可以在不同的方向进一步阐述。首先,将任务视为具有接口方法的普通对象,并让这些方法决定是否要将这个对象提交给线程池。这要求每个任务都知道它的线程池——在创建时将其作为参数传递。更方便的是,保持对线程池的引用作为线程局部变量。

您可以轻松地模拟函数式编程:一个对象代表一个函数调用,并且对于每个参数,它都有相应的 set 方法。设置好所有参数后,将对象提交到线程池。

另一个方向是actor编程:任务类只有一个set方法,但可以多次调用,如果前面的参数还没有处理,set方法不会将任务提交到线程池,而是简单的将它的参数存储在一个队列。run() 方法处理队列中所有可用的参数,然后返回。

所有这些功能都在数据流库https://github.com/rfqu/df4j中实现。我故意写它来支持基于任务的并行性。

于 2013-08-06T20:49:28.970 回答