我必须并行化现有的后台任务,而不是串行消耗“x”资源,而是仅使用“y”线程(y << x)并行完成手头的工作。该任务不断在后台运行并不断处理一些资源。
代码结构如下:
class BaseBackground implements Runnable {
@Override
public void run() {
int[] resources = findResources(...);
for (int resource : resources) {
processResource(resource);
}
stopProcessing();
}
public abstract void processResource(final int resource);
public void void stopProcessing() {
// Override by subclass as needed
}
}
class ChildBackground extends BaseBackground {
@Override
public abstract void processResource(final int resource) {
// does some work here
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
}
我已ChildBackground
按以下方式修改:
class ChildBackground extends BaseBackground {
private final BlockingQueue<Integer> resourcesToBeProcessed;
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource) {
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
// does some work
}
}
}
}
我不会每次都创建和拆除 ExecutorService,因为垃圾收集在我的服务中有点问题。虽然,我不明白它会有多糟糕,因为我不会在每次迭代中产生超过 10 个线程。
我无法理解如何等待所有ResourceProcessor
s 完成一次迭代的处理资源,以便我可以重置一些计数并在stopProcessing
. 我考虑了以下选项:
1) executorService.awaitTermination(超时)。这不会真正起作用,因为它会一直阻塞直到超时,因为ResourceProcessor
线程永远不会真正完成它们的工作
2)我可以在之后找出资源的数量findResources
并将其提供给子类,并让每个ResourceProcessor
增加处理的资源数量。stopProcessing
在重置计数之前,我将不得不等待所有资源都被处理。我需要类似 CountDownLatch 的东西,但它应该算数UP
。这个选项会有很多状态管理,我不是特别喜欢。
3)我可以更新public abstract void processResource(final int resource)
以包括总资源计数并让子进程等到所有线程都处理完总资源。在这种情况下也会有一些状态管理,但仅限于子类。
在这两种情况下,我都必须添加 wait() 和 notify() 逻辑,但我对我的方法没有信心。这就是我所拥有的:
class ChildBackground extends BaseBackground {
private static final int UNSET_TOTAL_RESOURCES = -1;
private final BlockingQueue<Integer> resourcesToBeProcessed;
private int totalResources = UNSET_TOTAL_RESOURCES;
private final AtomicInteger resourcesProcessed = new AtomicInteger(0);
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource, final int totalResources) {
if (this.totalResources == UNSET_TOTAL_RESOURCES) {
this.totalResources = totalResources;
} else {
Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
}
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
try {
waitForAllResourcesToBeProcessed();
} catch (InterruptedException e) {
e.printStackTrace();
}
resourcesProcessed.set(0);
totalResources = UNSET_TOTAL_RESOURCES;
// reset some counts and emit metrics
}
private void incrementProcessedResources() {
synchronized (resourcesProcessed) {
resourcesProcessed.getAndIncrement();
resourcesProcessed.notify();
}
}
private void waitForAllResourcesToBeProcessed() throws InterruptedException {
synchronized (resourcesProcessed) {
while (resourcesProcessed.get() != totalResources) {
resourcesProcessed.wait();
}
}
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
try {
// does some work
} finally {
incrementProcessedResources();
}
}
}
}
}
我不确定 usingAtomicInteger
是否是正确的方法,如果是,我是否需要调用 wait() 和 notify()。如果我不使用 wait() 和 notify(),我什至不必在同步块中执行所有内容。
如果我应该为每次迭代简单地创建和关闭 ExecutorService 或者我应该采用第四种方法,请让我知道您对这种方法的想法。