0

我正在开发一个带有线程池的系统,该线程池将提交链式任务。每个任务都将运行,检查其结果,然后根据结果执行另一个任务。这是在同一个线程中完成的,因此它不会将作业重新提交回线程池。每个任务都会将其结果添加到一个包含 Collection 并提供一些其他次要功能的对象中,然后将其传递给链中的下一个任务。一旦到达链的末端,结果将在 Future.get() 中返回,并具有超时时间,并且可以分析完整的结果。

每个任务都将调用一个外部服务,即一个 SOAP 请求。如果任务挂起,它很可能正在等待并等待响应。在这种情况下,如果 get() 超时,我会捕获 TimeoutException 并取消(true)未来。这意味着在任务线程中抛出 InterruptedException(因为它可能处于等待状态)并且可以被捕获。异常包装在响应对象中并放置在 Collection 包装器中。

现在,如果超时,我似乎无法从 Future 对象中取回该集合。我正在考虑挂在原始任务(或任务包装器)上,如果 TimeoutException 被捕获并且 Future 被取消,则从原始任务中检索 Collection 包装器,它应该包含直到点的结果暂停。

但是,我不知道这本身是线程安全的还是竞争条件。如果主线程在取消 Future 对象后立即尝试访问被包装的 Collection,那么被包装的异常会在那里吗?它会在并发迭代和修改的尝试中结束吗?有没有办法确保包装的异常在主线程检索它之前进入该集合?

一些问答示例代码:

public class MyCallable implements Callable<MyResponseCollection> {

    private MyResponseCollection responses = new MyResponseCollection();  //Wraps a List
    private MyTask task; //This is set prior to submitting the task wrapper

    public MyResponseCollection call() throws Exception() {
        task.setCollection(responses);
        task.call(); //kicks off the task chain, not an override to Callable.call
        return responses; //If all goes well, this is passed into the Future
    }
}

public class MyTask {

    private MyResponseCollection responses;

    public void setCollection(MyResponseCollection responses){
        this.responses = responses;
    }

    public void call(){
        try{
            MyResponse m = this.doStuff();
            responses.add(m);
            this.executeNext(m); //Runs the next task based on the response, simplified here, responses object passed into the next task

        } catch (InterruptedException e){
            responses.add(new ExceptionResponse(e)); //Here's where we catch that potential interrupt
        }
     }
     public MyResponse doStuff() throws InterruptedException{
         //All work done here
     }
}

这是我对多线程的第一次重大尝试,所以我不太确定如何确保线程等之间的操作顺序,或者我是否在这里做一些愚蠢的事情并且有更好的解决方案。MyResponseCollection 上的 synchronize() 块是否也适用于其中的列表?是否要求所述列表也是 Collections.synchronizedList?

4

1 回答 1

1

为了让一个线程在资源被释放时向另一个线程指示,我会Semaphore isDone在资源中添加一个字段。

public class MyResponseCollection {
    public final Semaphore isDone = new Semaphore(0);
}

public class MyCallable implements Callable<MyResponseCollection> {
    ...
    public MyResponseCollection call() throws Exception() {
        task.setCollection(responses);
        task.call(); //kicks off the task chain, not an override to Callable.call
        responses.isDone.acquire();
        return responses; //If all goes well, this is passed into the Future
    }
}


public class MyTask {
    ...

    public void call(){
        try{

        } finally {
            responses.isDone.release();
        }
     }
 }

responses.isDone.acquire()将阻塞直到许可可用,isDone初始化为零许可。 MyTask#call()在它的块中添加一个许可证,finally它会唤醒MyCallable.

于 2013-08-14T18:37:52.757 回答