0

我必须并行化现有的后台任务,而不是串行消耗“x”资源,而是仅使用“y”线程(y << x)并行完成手头的工作。该任务不断在后台运行并不断处理一些资源。

代码结构如下:

class BaseBackground implements Runnable {
    @Override
    public void run() {
        int[] resources = findResources(...);

        for (int resource : resources) {
            processResource(resource);
        }

        stopProcessing();
     }

    public abstract void processResource(final int resource);
    public void void stopProcessing() {
         // Override by subclass as needed
    }
}

class ChildBackground extends BaseBackground {

    @Override
    public abstract void processResource(final int resource) {
        // does some work here
    }

    public void void stopProcessing() {
        // reset some counts and emit metrics
    }
}

我已ChildBackground按以下方式修改:

class ChildBackground extends BaseBackground {

    private final BlockingQueue<Integer> resourcesToBeProcessed;

    public ChildBackground() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; ++i) {
             executorService.submit(new ResourceProcessor());
        }
    }

    @Override
    public abstract void processResource(final int resource) {
        resourcesToBeProcessed.add(resource);
    }

    public void void stopProcessing() {
        // reset some counts and emit metrics
    }

    public class ResourceProcessor implements Runnable {
        @Override
        public void run() {
            while (true) {
                int nextResource = resourcesToBeProcessed.take();
                // does some work
            }
        }
    }
}

我不会每次都创建和拆除 ExecutorService,因为垃圾收集在我的服务中有点问题。虽然,我不明白它会有多糟糕,因为我不会在每次迭代中产生超过 10 个线程。

我无法理解如何等待所有ResourceProcessors 完成一次迭代的处理资源,以便我可以重置一些计数并在stopProcessing. 我考虑了以下选项:

1) executorService.awaitTermination(超时)。这不会真正起作用,因为它会一直阻塞直到超时,因为ResourceProcessor线程永远不会真正完成它们的工作

2)我可以在之后找出资源的数量findResources并将其提供给子类,并让每个ResourceProcessor增加处理的资源数量。stopProcessing在重置计数之前,我将不得不等待所有资源都被处理。我需要类似 CountDownLatch 的东西,但它应该算数UP。这个选项会有很多状态管理,我不是特别喜欢。

3)我可以更新public abstract void processResource(final int resource)以包括总资源计数并让子进程等到所有线程都处理完总资源。在这种情况下也会有一些状态管理,但仅限于子类。

在这两种情况下,我都必须添加 wait() 和 notify() 逻辑,但我对我的方法没有信心。这就是我所拥有的:

class ChildBackground extends BaseBackground {

    private static final int UNSET_TOTAL_RESOURCES = -1;

    private final BlockingQueue<Integer> resourcesToBeProcessed;

    private int totalResources = UNSET_TOTAL_RESOURCES;
    private final AtomicInteger resourcesProcessed = new AtomicInteger(0);

    public ChildBackground() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; ++i) {
             executorService.submit(new ResourceProcessor());
        }
    }

    @Override
    public abstract void processResource(final int resource, final int totalResources) {
        if (this.totalResources == UNSET_TOTAL_RESOURCES) {
            this.totalResources = totalResources;
        } else {
            Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
        }
        resourcesToBeProcessed.add(resource);
    }

    public void void stopProcessing() {
        try {
            waitForAllResourcesToBeProcessed();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        resourcesProcessed.set(0);
        totalResources = UNSET_TOTAL_RESOURCES;
        // reset some counts and emit metrics
    }

    private void incrementProcessedResources() {
        synchronized (resourcesProcessed) {
            resourcesProcessed.getAndIncrement();
            resourcesProcessed.notify();
        }
    }

    private void waitForAllResourcesToBeProcessed() throws InterruptedException {
        synchronized (resourcesProcessed) {
             while (resourcesProcessed.get() != totalResources) {
                resourcesProcessed.wait();
             }
        }
    }

    public class ResourceProcessor implements Runnable {
        @Override
        public void run() {
            while (true) {
                int nextResource = resourcesToBeProcessed.take();
                try {
                   // does some work
                } finally {
                   incrementProcessedResources();
                }
            }
        }
    }
}

我不确定 usingAtomicInteger是否是正确的方法,如果是,我是否需要调用 wait() 和 notify()。如果我不使用 wait() 和 notify(),我什至不必在同步块中执行所有内容。

如果我应该为每次迭代简单地创建和关闭 ExecutorService 或者我应该采用第四种方法,请让我知道您对这种方法的想法。

4

1 回答 1

1

您的代码似乎过于复杂。当队列中已经有队列时,为什么还要有自己的队列ExecutorService?当我认为您可以让库存ExecutorService为您处理时,您必须进行大量管理。

我将您的工作定义为:

public static class ResourceProcessor implements Runnable {
   private final int resource;
   public ResourceProcessor(int resource) {
      this.resource = resource;
   }
   public void run() {
      try {
         // does some work
      } finally {
         // if this is still necessary then you should use a `Future` instead
         incrementProcessedResources();
      }
   }
}

然后你可以像这样提交它们:

ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < totalResources; ++i) {
     executorService.submit(new ResourceProcessor(i));
}
// shutdown the thread pool after the last submit
executorService.shutdown();

executorService.awaitTermination(timeout). 这不会真正起作用,因为它会一直阻塞直到超时,因为 ResourceProcessor 线程永远不会真正完成它们的工作

这现在可以工作了。

2)我可以找出资源的数量[已完成]。

如果你可以打电话,你还需要这个awaitTermination(...)吗?

3) 我可以更新公共抽象 void processResource(final int resource) 以包括总资源计数并让子进程等到所有线程都处理完总资源...

同样的问题。需要这个吗?

如果您确实需要知道已处理请求的列表,那么您可以像@ScaryWombat 提到的那样使用Future<Integer>Callable<Integer>或使用ExecutorCompletionService.

Futures 不是一个选项,因为 executor 线程在一个紧密的循环中运行,只有在服务被停用时才会停止。

你能再解释一下吗?

希望这可以帮助。

于 2016-09-14T00:59:44.587 回答