1

解释起来有点复杂,但我们开始吧。基本上,问题是“如何以有效的方式将问题分解为子问题”。这里的“高效”意味着分解的子问题尽可能大。基本上,如果我根本不需要分解问题,那将是理想的。然而,因为一个工人只能处理特定的问题,我确实需要分手。但我想找到一种尽可能粗略的方法。

这是一些伪代码..

我们有这样的问题(对不起,它是在 Java 中。如果你不明白,我很乐意解释)。

class Problem {
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 };
    final Data data = //Some data
}

一个子问题是:

class SubProblem {
    final Set<Integer> targetedSectionIds;
    final Data data;

    SubProblem(Set<Integer> targetedSectionsIds, Data data){
        this.targetedSectionIds = targetedSectionIds;
        this.data = data;
    }
}

那么,工作将是这样的。

class Work implements Runnable {
    final Set<Section> subSections;
    final Data data;
    final Result result;

    Work(Set<Section> subSections, Data data) {
        this.sections = SubSections;
        this.data = data;
    }

    @Override
    public void run(){
        for(Section section : subSections){
            result.addUp(compute(data, section));
        }
    }
}

现在我们有了 'Worker' 的实例,它们有自己的 state sections I have

class Worker implements ExecutorService {
    final Map<Integer,Section> sectionsIHave;
    {
        sectionsIHave = {1:section1, 5:section5, 8:section8 };
    }

    final ExecutorService executor = //some executor.

    @Override
    public void execute(SubProblem problem){
        Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds);
        super.execute(new Work(sectionsNeeded, problem.data);
    }

}

唷。

所以,我们有很多Problems 并且Workers不断地要求更多SubProblems。我的任务是分解ProblemsSubProblem交给他们。然而,困难在于,我必须稍后收集 SubProblems 的所有结果并将它们合并(减少)到Result整个Problem.

然而,这是昂贵的,所以我想给工人尽可能大的“块”(尽可能多targetedSections)。

它不必是完美的(在数学上尽可能高效或其他)。我的意思是,我想不可能有一个完美的解决方案,因为你无法预测每次计算需要多长时间等等。但是有一个很好的启发式解决方案吗?或者也许我可以在开始设计之前阅读一些资源?

任何建议都非常感谢!

编辑:我们也可以控制部分分配,因此控制它是另一种选择。基本上,对此的唯一限制是工人只能拥有固定数量的部分。

4

1 回答 1

1

好的,看起来你有一个网络服务的分片模型,我们做了类似的事情,我们使用“entityId”(sectionId)的反向索引到“client”(worker),它将连接到特定的网络服务,将处理该特定实体。最简单的方法(见下文)是使用 id 到 worker 的反向映射。如果内存是一个约束,另一种可能性是使用一个函数(例如 sectionId % 服务数)。

为了给服务提供尽可能多的工作,有一个简单的批处理算法可以将批处理填充到某个用户指定的最大值。这将允许根据远程服务能够使用它们的速度来粗略地调整工作块的大小。

public class Worker implements Runnable {

    private final Map<Integer, Section> sections;
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096);
    private final int batchSize;

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) {
        this.sections = sectionsIHave;
        this.batchSize = batchSize;
    }

    public Set<Integer> getSectionIds() {
        return sections.keySet();
    }

    public void execute(final SubProblem command) throws InterruptedException {

        if (sections.containsKey(command.getSectionId())) {
            problemQ.put(command);
        } else {
            throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId());
        }

    }

    @Override
    public void run() {
        final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize);
        while (!Thread.interrupted()) {
            batch.clear();

            try {
                batch.add(problemQ.take());
                for (int i = 1; i < batchSize; i++) {
                    final SubProblem problem = problemQ.poll();
                    if (problem != null) {
                        batch.add(problem);
                    } else {
                        break;
                    }

                    process(batch);
                }
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void process(final List<SubProblem> batch) {
        // Submit to remote process.
    }

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) {
        final Map<Integer, Worker> temp = new HashMap<Integer, Worker>();
        for (final Worker worker : workers) {
            for (final Integer sectionId : worker.getSectionIds()) {
                temp.put(sectionId, worker);
            }
        }
        return Collections.unmodifiableMap(temp);
    }

    public static void main(final String[] args) throws InterruptedException {
     // Load workers, where worker is bound to single remote service
        final List<Worker> workers = getWorkers();
        final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers);
        final List<SubProblem> subProblems = getSubProblems();
        for (final SubProblem problem : subProblems) {
            final Worker w = workerReverseIndex.get(problem.getSectionId());
            w.execute(problem);
        }
    }
}
于 2010-03-28T13:41:37.877 回答