0

我有两个主线程。一个产生新线程,另一个监听结果,如下所示:

//Spawner
while(!done) {
    spawnNewProcess(nextId, parameters);

    myListener.listenFor(nextId);

    nextId ++;
}

spawnNewProcess 方法需要大量可变的时间。当它完成时,它会将一个结果对象放入一个可以通过 Id 访问的映射中。

侦听器线程运行如下:

//Listener
while(!done) {
    for (int id : toListenFor) {
        if (resultMap.contains(id)) {
            result = resultMap.get(id);

            toListenFor.remove(id);

            process(result);
        }
    }
}

我无法更改 spawnNewProcess 方法,也无法更改它存储结果的方式。我想要做的是设置可以同时进行的最大限制。我的第一个倾向是让一个变量跟踪那个数字。如果超过最大值,则生成器将等待。当结果返回时,侦听器将通知它。像这样:

//Spawner2

AtomicInteger numSpawns = new AtomicInteger(0);
int maxSpawns = 10;

while(!done) {
    if (numSpawns.intValue() >= maxSpawns) {
        this.wait(0);
    }
    numSpawns.getAndIncrement;

    spawnNewProcess(nextId, parameters);

    myListener.listenFor(nextId);

    nextId ++;
}

听者是:

//Listener2
while(!done) {
    for (int id : toListenFor) {
        if (resultMap.contains(id)) {
            result = resultMap.get(id);

            toListenFor.remove(id);

            numSpawns.getAndDecrement();
            Spawner.notify();

            process(result);
        }
    }
}

这行得通吗?是否存在我遗漏的潜在死锁?如果以某种方式同时运行 11 个或 9 个产卵而不是 10 个,那将不是什么大不了的事。还是有更好的方法我没有注意到?

4

2 回答 2

2

使用Semaphore.

import java.util.concurrent.Semaphore;
private Semaphore sem = new Semaphore(NUM_MAX_CONCURRENT);

// Spawner
while(!done) {

    sem.acquire(); // added by corsiKa

    spawnNewProcess(nextId, parameters);

    myListener.listenFor(nextId);

    nextId ++;
}

// listener
while(!done) {
    for (int id : toListenFor) {
        if (resultMap.contains(id)) {
            result = resultMap.get(id);

            toListenFor.remove(id);
            sem.release(); // added by corsiKa
            process(result);
        }
    }
}
于 2013-03-01T18:51:27.333 回答
0

要控制运行的生成器数量,请使用 a Executors.newFixedThreadPool(size),它一次运行的任务数量始终不超过固定数量。然后将生成任务包装在 a 中Runnable并将它们传递给ExecutorService.

while(!done) {
    task = new Runnable() { public void run() {
        spawnNewProcess(nextId, parameters);
    } });

    exec.submit(task);;

    nextId ++;
}

要返回结果,请使用SynchronousQueueor ConcurrentLinkedQueue,这将允许您在线程之间传递对象,而无需使用较低级别的并发对象。

于 2013-03-01T18:59:55.327 回答