4

我有一个任务队列,以及一个在几秒钟内查看队列一次的线程,如果有任务它会执行它。

我有另一个代码部分(当然在另一个线程中),它在循环中创建任务(我无法从循环外部提前知道任务的数量)并将它们插入队列。任务包含一些“结果”对象,外部线程(创建这些任务)需要等待所有任务完成并最终从每个任务中获取结果。问题是我无法将 java Semaphore\CountDownLatch 等传递给结果对象,因为我事先不知道监视器的数量。我也不能使用使用 invokeAll 的 Executor 或等待 Future 对象,因为任务是不同步的(外部线程只是将任务推送到队列中,另一个线程将在他有时间时执行任务)。

我想到的唯一解决方案是创建一些“反转信号量”类,该类包含一组结果和一个监视器计数器。getResult 函数将检查计数器是否 == 0,如果答案是肯定的,将通知某个锁对象,getResult 函数将等待此锁:

public class InvertedSemaphore<T> {
    Set<T> resultSet;
    int usages;
    final Object c;

    public InvertedSemaphore() {
        resultSet = Collections.synchronizedSet(new HashSet<T>());
        usages = 0;
        c = new Object();
    }

    public void addResult(T result) {
        resultSet.add(result);
    }

    public void addResults(Set<T> result) {
        resultSet.addAll(result);
    }


    public void acquire() {
        usages++;
    }

    public void release() {
        synchronized (c) {
            if (--usages == 0) {
                c.notify();
            }
        }
    }

    public Set<T> getResults() {
        synchronized (c) {
            try {
                while (usages > 0) {
                    c.wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return resultSet;
    }

}

每个 addTask 方法都会调用 semaphore.acquire,每个(未同步的)任务都会在任务结束时调用 semaphore.release。

这听起来很复杂,我很确定在 java 并发库或其他东西中有更好的解决方案。

任何想法都会被应用:)

4

3 回答 3

4

如果不需要按顺序处理任务,请使用ExecutorCompletionService

更一般地,没有必要使用invokeAllonExecutorService来获得Future结果。ExecutorService#submit可以用于此目的,或者可选地,正在创建的任务可以Future自行实现,从而允许任务的创建者在稍后的时间点请求结果。

一些代码:

class MyTask {
    AtomicReference<?> result = new AtomicReference<?>();

    void run() {
       //do stuff here
       result.set(/* the result of the calculation */);
    }

    boolean resultReady() {
        return result.get()!=null;
    }

    ? get() {
        return result.get();
    }
}

...代码中的其他地方

void createTasks() {
    Collection<MyTask> c = new ...;

    while(indeterminable condition) {
        MyTask task = new MyTask();
        c.add(task);
        mysteryQueue.add(task);
    }

    while(haven't received all results) {
        MyTask task = c.get(...); //or iterate or whatever
        ? result = task.get();
        if (result!=null) {
            //do stuff, probably remove the task from the collection c would be smart
        }
    }
}
于 2012-07-25T17:19:08.270 回答
1

One idea would be to use a separate queue for the results.
So you will have one blocking queue that thread A places tasks for thread B thereby having a producer-consumer approach, and when each task is completed, the result could be placed in the second result queue inverting the consumer-producer roles since now thread A that originally created the tasks will consume the result from the second queue.

于 2012-07-25T17:35:01.083 回答
1

您可以执行以下操作:每个生产者都将拥有自己的队列。生产者将向任务本身传递一种向该队列报告的方法。当任务完成运行时,它会将其结果排队到此队列中。这是一些代码描述的野兽:

class Result{}

interface IResultCallback{
    void resultReady(Result r); // this is an abstraction of the queue
}

class Producer implements IResultCallback{ 
    // the producer needs to pass itself to the constructor of the task,
    // the task will only see its "resultReady" facade and will be able to report to it. 
    // the producer can aggragte the results at it will and execute its own computation as 
        // as soon it is ready

    Queue<Result> results; // = init queue

    @Override
    public void resultReady(Result r) {
        results.add(r);

        if(results.size() == 9){
            operate();
        }
        results.clear();
    }

    public void operate(){
        // bla bla
    }
}

public class Task {
    IResultCallback callback;

    public Task(IResultCallback callback){
        this.callback = callback;
    }
    public void execute(){
        // bla bla

        Result r = null; // init result;
        callback.resultReady(r);
    }
}
于 2012-07-25T18:17:40.183 回答