4

我正在使用 Java ExecutorService 框架来提交可调用的任务以供执行。这些任务与 Web 服务通信,并应用 5 分钟的 Web 服务超时。但是,我已经看到,在某些情况下,超时被忽略并且线程在 API 调用上“挂起”——因此,我想取消所有耗时超过 5 分钟的任务。

目前,我有一个期货列表,我遍历它们并调用 future.get 直到所有任务都完成。现在,我已经看到future.get 重载方法需要一个超时并在该窗口中任务未完成时引发超时。所以我想到了一种方法,我在超时的情况下执行 future.get(),在 TimeoutException 的情况下,我执行 future.cancel(true) 以确保此任务被中断。

我的主要问题
1. 超时获取是解决此问题的最佳方法吗?
2. 是否有可能我正在等待尚未放置在线程池中的任务的 get 调用(不是活动的工作人员)。在那种情况下,我可能会终止一个线程,当它启动时,它实际上可能在所需的时间限制内完成?

任何建议将不胜感激。

4

4 回答 4

2
  1. 超时获取是解决此问题的最佳方法吗?

    • 这还不够。例如,如果您的任务不是为响应中断而设计的,它将继续运行或被阻止
  2. 是否有可能我正在等待尚未放置在线程池中的任务的 get 调用(不是活动的工作人员)。在那种情况下,我可能会终止一个线程,当它启动时,它实际上可能在所需的时间限制内完成?

    • 是的,如果您的线程池配置不正确,您最终可能会取消作为从未计划运行的任务

当您的任务包含不可中断阻塞时,以下代码片段可能是您使任务响应中断的一种方式。它也不会取消未计划运行的任务。这里的想法是通过关闭套接字,数据库连接等来覆盖中断方法并关闭正在运行的任务。这段代码并不完美,您需要根据要求进行更改,处理异常等。

class LongRunningTask extends Thread {
private  Socket socket; 
private volatile AtomicBoolean atomicBoolean;


public LongRunningTask() {
    atomicBoolean = new AtomicBoolean(false);
}

@Override
public void interrupt() {
    try {
        //clean up any resources, close connections etc.
        socket.close();
    } catch(Throwable e) {
    } finally {
        atomicBoolean.compareAndSet(true, false);
        //set the interupt status of executing thread.
        super.interrupt();
    }
}

public boolean isRunning() {
    return atomicBoolean.get();
}

@Override
public void run() {
    atomicBoolean.compareAndSet(false, true);
    //any long running task that might hang..for instance
    try {
        socket  = new Socket("0.0.0.0", 5000);
        socket.getInputStream().read();
    } catch (UnknownHostException e) {
    } catch (IOException e) {
    } finally {

    }
}
}
//your task caller thread
//map of futures and tasks 
    Map<Future, LongRunningTask> map = new HashMap<Future, LongRunningTask>();
    ArrayList<Future> list = new ArrayList<Future>();
    int noOfSubmittedTasks = 0;

    for(int i = 0; i < 6; i++) {
        LongRunningTask task = new LongRunningTask();
        Future f = execService.submit(task);
        map.put(f, task);
        list.add(f);
        noOfSubmittedTasks++;
    }

    while(noOfSubmittedTasks > 0) {
        for(int i=0;i < list.size();i++) {
            Future f = list.get(i);
            LongRunningTask task = map.get(f);
            if (task.isRunning()) {
                /*
                 * This ensures that you process only those tasks which are run once
                 */
                try {
                    f.get(5, TimeUnit.MINUTES);
                    noOfSubmittedTasks--;
                } catch (InterruptedException e) {
                } catch (ExecutionException e) {
                } catch (TimeoutException e) {
                                            //this will call the overridden interrupt method
                    f.cancel(true);
                    noOfSubmittedTasks--;
                }
            }

        }
    }
    execService.shutdown();
于 2013-08-30T11:15:27.673 回答
0

超时获取是解决此问题的最佳方法吗?

是的,get(timeout)在 Future 对象上完全没问题,如果未来指向的任务已经执行,它将立即返回。如果任务尚未执行或正在执行,那么它将等到超时,这是一个很好的做法。

是否有可能我正在等待尚未放置在线程池中的任务的 get 调用(不是活动的工作人员)

仅当您将任务放在线程池中时才会获得Future对象,因此如果不将任务放在线程池中就无法调用get()任务。是的,有可能该任务尚未由自由工人承担。

于 2013-08-30T07:09:54.747 回答
0

您正在谈论的方法是可以的。但最重要的是,在设置超时阈值之前,您需要知道线程池大小和超时对于您的环境的完美值是多少。进行压力测试,这将揭示您配置为 Threadpool 的一部分的工作线程的数量是否正常。这甚至可以减少超时值。所以这个测试是我觉得最重要的。

获取超时非常好,但如果它抛出 TimeoutException,您应该添加以取消任务。如果您正确执行上述测试并将您的线程池大小和超时值设置为理想值,那么您甚至可能不需要从外部取消任务(但您可以将其作为备份)。是的,有时在取消任务时,您最终可能会取消尚未被 Executor 接收的任务。

于 2013-08-30T07:31:43.923 回答
0

您当然可以使用取消任务

任务取消(真)

这是完全合法的。但是,如果它是 "RUNNING" ,这将中断线程

如果线程正在等待获取内在锁,则“中断”请求除了设置线程的中断状态外没有任何作用。在这种情况下,您无法做任何事情来阻止它。为了发生中断,线程应该通过获取它正在等待的锁来从“阻塞”状态中出来(这可能需要超过 5 分钟)。这是使用“内在锁定”的限制。

但是,您可以使用显式锁定类来解决此问题。您可以使用“Lock”接口的“lockInterruptibly”方法来实现这一点。“lockInterruptibly”将允许线程尝试获取锁,同时保持对中断的响应。这是一个小例子来实现这一点:

public void workWithExplicitLock()throws InterruptedException{
Lock lock = new ReentrantLock();
lock.lockInterruptibly()();
try {
// work with shared object state
} finally {
lock.unlock();
}

}

于 2013-08-30T10:27:12.487 回答