5

我使用围绕 2 线程 FixedThreadPool ExecutorService 包裹的 CompletionService 提交了一些 Future 任务,我设置然后设置一个等于提交的任务数量的循环,并使用 completionservice.take() 等待它们全部完成或失败。麻烦是偶尔它永远不会完成(但我不知道为什么)所以我将 take() 方法更改为 poll(300,Timeout.SECONDS),想法是如果一项任务需要超过 5 分钟才能完成poll 将失败,然后最终将退出循环,我可以遍历所有期货并调用 future.cancel(true) 以强制取消有问题的任务。

但是当我运行代码并且它挂起时,我看到轮询每 5 分钟连续失败一次并且没有更多任务运行,所以我假设这两个工作人员以某种方式陷入僵局并且永远不会完成,并且永远不允许启动其他任务。因为超时是 5 分钟,并且还有 1000 个任务要运行,所以打破循环所花费的时间太长,所以取消了作业。

所以我想要做的是中断/强制取消当前任务,如果没有在 5 分钟内完成,但我看不到任何方法。

此代码示例显示了我所说的简化版本

import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println("Worker TimedOut:");
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && result.get())
                        {
                            System.out.println("Worker Completed:");
                        }
                        else
                        {
                            System.out.println("Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace();
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println("Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
    {
        if(number==3)
        {
            try
            {
                Thread.sleep(50000);
            }
            catch(InterruptedException tie)
            {

            }
        }
        return true;
    }
}

输出

Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done
4

3 回答 3

5

我认为我已经解决了,基本上如果发生超时,我会遍历我的未来对象列表并找到第一个尚未完成的对象,然后强制取消。看起来不那么优雅,但似乎工作。

我更改了池的大小,只是为了显示更好地演示解决方案的输出,但也适用于 2 线程池。

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                System.out.println("Invocation:"+t);
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println(new Date()+":Worker Timedout:");
                    //So lets cancel the first futures we find that havent completed
                    for(Future future:futures)
                    {
                        System.out.println("Checking future");
                        if(future.isDone())
                        {
                            continue;
                        }
                        else
                        {
                            future.cancel(true);
                            System.out.println("Cancelled");
                            break;
                        }
                    }
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && !result.isCancelled() && result.get())
                        {
                            System.out.println(new Date()+":Worker Completed:");
                        }
                        else if(result.isDone() && !result.isCancelled() && !result.get())
                        {
                            System.out.println(new Date()+":Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace(System.out);
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println(new Date()+":Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
        throws InterruptedException
    {
        try
        {
            if(number==3)
            {
                Thread.sleep(50000);
            }
        }
        catch(InterruptedException ie)
        {
            System.out.println("Worker Interuppted");
            throw ie;
        }
        return true;
    }
}

输出是

Invocation:0
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:1
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:2
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout:
Checking future
Checking future
Checking future
Cancelled
Invocation:3
Worker Interuppted
Invocation:4
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Invocation:5
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Thu Mar 10 20:51:49 GMT 2011:Done
于 2011-03-10T20:55:40.747 回答
2

在您的工作人员示例中,您的 Callable 正在阻止支持中断的呼叫。如果您的真实代码在内部锁(synchronized块)上死锁,您将无法通过中断取消它。相反,您可以使用显式锁 ( java.util.concurrent.Lock),它允许您指定要等待获得锁的时间。如果线程超时等待锁定,可能是因为它遇到了死锁情况,它可以中止并显示错误消息。

顺便说一句,在你的例子中,你Callable不应该吞咽InterruptedException。您应该将其传递(重新抛出,或添加InterruptedException到方法声明的 throws 行),或者在 catch 块中,重置线程的中断状态(使用Thread.currentThread().interrupt())。

于 2011-03-10T17:15:29.657 回答
1

你可以随时调用future.get(timeout...)
如果它还没有完成它会返回超时异常......然后你可以调用future.cancel().

于 2012-10-01T22:34:29.773 回答