5

我正在寻找一种实现,ExecutorService它将提供以下语义。每个线程都由一个“工作人员”占用,该工作人员根据输入执行某些任务。每个工作者都保证只在单个线程中执行,因此,应该允许它在一个任务之间维护状态,而没有同步的开销,因为它将在单个线程中与自身同步。

因此,假设我有 100 个输入和 10 个工人,我希望能够编写如下内容:

for (Input input: inputs) {
    // The following code would pass t to all 10 workers,
    // each bound to their own thread,
    // and wait for them to complete.
    executor.invokeAll(input);
}

请注意,每个 Worker 对任何给定的输入都会做不同的事情。输入不是可运行的代码块,它只是工人的一个参数。每个工人决定如何处理输入。不过,为了使其更简单,工作人员实现了一个接口,允许以多态方式调用它,接收输入。

我已经使用 a 破解了一些可行的东西,我的瘦包装器在Map<Worker, WorkerExecutor>哪里a ,并且每个线程池中只有一个 Worker 实例将运行。我更愿意找到知道自己在做什么的人写的东西:-)WorkerExecutorExecutors.newSingleThreadPool


潜在的低效率我可以接受

我意识到这种语义会导致效率低下,但是,我试图在开发时间方面获得最大的收益,并且重新设计 Worker 的每个实现以实现线程安全并非易事。我的意思是效率低下是执行可能/将看起来像这样(在此示例中模拟最多 2 个活动线程):

         | Task 1    | Task 2    | Task 3    | Task 4    |
Worker 1 | =@        | =@        | =@        | =@        |
Worker 2 | ==@       | ==@       | ==@       | ==@       |
Worker 3 |   ==@     |   ==@     |   ==@     |   ==@     |
Worker 4 |    =====@ |    =====@ |    =====@ |    =====@ |

问题是 Worker 3 完成后,没有任务可做,直到 Worker 4 完成后才能完成任何工作。这可能是 CPU 可以空闲的任意长的时间。


这样的ExecutorService存在吗?

4

2 回答 2

2

听起来你真正想要的是演员。简单地说,actor 是一个在单个线程中运行的对象,并且有一个任务“邮箱”,它负责按顺序处理这些任务。Akka似乎是当前在 JVM 上提供 actor 的领先库/框架。看看那边。

于 2013-02-23T21:02:11.250 回答
1

类似于以下内容:

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

// you implement this for each of your non-parallelisable jobbies
interface Worker<T> {
    public void process(T input);
}

// implementation detail
class Clerk<T> {
    private final Executor executor = Executors.newSingleThreadExecutor();
    private final Worker<T> worker;

    public Clerk(Worker<T> worker) {
        this.worker = worker;
    }

    public void process(final T input) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                worker.process(input);
            }
        });
    }
}

// make one of these, and give it all your workers, then give it input
class Workshop<T> {
    private final Set<Clerk<T>> clerks = new LinkedHashSet<Clerk<T>>();

    public void addWorker(Worker<T> worker) {
        // mutable; you love it
        clerks.add(new Clerk<T>(worker));
    }

    public void process(T input) {
        for (Clerk<T> clerk : clerks) {
            clerk.process(input);
        }
    }

    public void processAll(Iterable<T> inputs) {
        for (T input : inputs) {
            process(input);
        }
    }
}

也许?

于 2013-02-27T18:30:40.193 回答