1

所以在 Java 并发中,有一个任务的概念,它实际上是任何实现RunnableCallable(更具体地说,是该接口的重写run()call()方法)。

我很难理解以下之间的关系:

  • 一个任务(Runnable/ Callable);和
  • 任务ExecutorService提交给;和
  • 底层的并发工作队列或列表结构,由ExecutorService

相信这种关系是以下几点:

  • 作为开发人员,您必须选择ExecutorService最适合手头任务的工作结构
  • 您使用要使用的基础结构初始化ExecutorService(例如,作为 a )(例如, an )(如果是,如何?!?!ScheduledThreadPoolArrayBlockingQueue
  • 您将任务提交给ExecutorService,然后使用其线程/池策略用任务的副本填充给定的结构(ABQ 或其他)
  • 现在,每个衍生/池化线程都从工作结构中提取任务副本并执行它

首先,如果我不符合上述任何假设,请更正/澄清上述任何假设!

其次,如果任务只是在底层工作结构中一遍又一遍地复制/复制(例如,列表的每个索引中的相同副本),那么您如何将一个大问题分解为更小的(并发)问题?换句话说,如果任务只是执行步骤 A - Z,并且您有一个包含 1,000 个这些任务的 ABQ,那么每个线程不会也只执行 A - Z 吗?你怎么说“一些线程应该在 A - G 上工作,而其他线程应该在 H 上工作,而其他线程应该在 I - Z 上工作”等等?

对于第二个,我可能需要一个代码示例来可视化它们是如何组合在一起的。提前致谢。

4

1 回答 1

2

您的最后一个假设并不完全正确。ExecutorService不会拉取任务的副本。程序必须单独提供由ExecutorService. 当一个任务完成后,队列中的下一个任务将被执行。

AnExecutorService是用于处理线程池的接口。您通常在池上执行多个任务,每个任务都针对问题的不同部分进行操作。作为开发人员,您必须在创建任务时指定每个任务应处理的问题的哪些部分,然后再将其发送到ExecutorService. 每个任务的结果(假设他们正在解决一个常见问题)应该添加到一个BlockingQueue或其他并发集合中,另一个线程可以使用结果或等待所有任务完成。

这是一篇您可能想阅读的关于如何使用的文章ExecutorServicehttp ://www.vogella.com/articles/JavaConcurrency/article.html#threadpools

更新: 的一个常见用途ExecutorService是实现生产者/消费者模式。这是我快速整理的一个示例以帮助您入门——它仅用于演示目的,为简单起见,省略了一些细节和关注点。线程池包含多个生产者线程和一个消费者线程。正在执行的工作是将 0...N 中的数字相加。每个生产者线程对较小的数字区间求和,并将结果发布到BlockingQueue. 消费者线程处理添加到BlockingQueue.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NumberCounter {

    private final ExecutorService pool = Executors.newFixedThreadPool(2);
    private final BlockingQueue<Integer> queue = new ArrayBlockingQueue(100);

    public void startCounter(int max, int workers) {
        // Create multiple tasks to add numbers. Each task submits the result
        // to the queue.
        int increment = max / workers;
        for (int worker = 0; worker < workers; worker++) {
            Runnable task = createProducer(worker * increment, (worker + 1) * increment);
            pool.execute(task);
        }

        // Create one more task that will consume the numbers, adding them up
        // and printing the results.
        pool.execute(new Runnable() {

            @Override
            public void run() {
            int sum = 0;

            while (true) {
                try {
                Integer result = queue.take();
                sum += result;
                System.out.println("New sum is " + sum);
                } catch (InterruptedException e) {
                e.printStackTrace();
                }
            }

            }
        });

    }

    private Runnable createProducer(final int start, final int stop) {
        return new Runnable() {

            @Override
            public void run() {
            System.out.println("Worker started counting from " + start + " to " + stop);
            int count = 0;
            for (int i = start; i < stop; i++) {
                count += i;
                }
                queue.add(count);
            }

        };
    }

    public static void main(String[] args) throws InterruptedException {
        NumberCounter counter = new NumberCounter();
        counter.startCounter(10000, 5);
    }

}
于 2012-05-02T02:32:08.660 回答