0

我想运行一个持久的操作并能够看到它的以下阶段:

1) 尚未运行

2) 运行

3) 完成好了

4) 异常结束

我写了下面的代码,看起来过于复杂。它使用三个类:WorkThreadPoolExecutorFutureTask<?>,其中Work是手写的。

同时,工作是部分复制FutureTask<?>功能(异常存储,这也是在内部完成的Future,但在内部是封闭的)。

问题是:有没有几行方法可以从 Java、Groovy、GPars、Apache 等的预定义类中做同样的事情?

编码:

public class AsyncRunAndTrackState {

   public static class Stub implements Runnable {
      @Override
      public void run() {
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }

   public static class Work implements Runnable {

      private Exception exception;

      private boolean active;

      public synchronized Exception getException() {
         return exception;
      }

      public synchronized void setException(Exception exception) {
         this.exception = exception;
      }

      public synchronized boolean isActive() {
         return active;
      }

      public synchronized void setActive(boolean active) {
         this.active = active;
      }

      @Override
      public final void run() {

         setActive(true);
         setException(null);

         try {
            runImpl();
         }
         catch (Exception e) {
            setException(e);
         }
         finally {
            setActive(false);
         }

      }

      protected void runImpl() {
         System.out.println("Before");

         try {
            Thread.sleep(10000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }

         throw new RuntimeException("Some exception occurred");

         //System.out.println("After");
      }
   }

   static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);

   static FutureTask<?> future;

   static Work work;

   public static void main(String[] args) {

      for(int i=0; i<10; ++i) {
         executor.submit(new Stub());
      }

      work = new Work();
      future = (FutureTask<?>) executor.submit(work);

      while(true) {

         System.out.println(String.format("future.done = %s, future.cancelled = %s", future.isDone(), future.isCancelled()));
         System.out.println(String.format("work.active = %s, work.exception = %s", work.isActive(), work.getException()));
         System.out.println();

         try {
            Thread.sleep(500);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }

      }

   }
}
4

2 回答 2

1

我通常使用数据流队列来通知异步活动的状态更改。

于 2015-12-11T10:56:49.900 回答
0

如果您将 FutureTask 子类化,也许它会起作用。这将导致一些这样的代码(如果我理解一切正确的话):

package experiment;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class Work implements Runnable{

    @Override
    public void run() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("Some exception occurred");
    }

    public static void main(String[] args){
        Work work = new Work();
        MyFutureTask<Object> future = new MyFutureTask<Object>(work, null);
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(future);

         while(true) {

             System.out.println(String.format("future.done = %s, future.cancelled = %s", future.isDone(), future.isCancelled()));
             System.out.println(String.format("work.active = %s, work.exception = %s", future.isActive(), future.retrieveExeption()));
             System.out.println();

             try {
                Thread.sleep(500);
             } catch (InterruptedException e) {
                e.printStackTrace();
             }

          }

    }
}

class MyFutureTask<A> extends FutureTask<A> {

    private Exception ex;

    @Override
    protected void done() {
        super.done();
        try {
            if (!isCancelled()) get();
        } catch (ExecutionException e) {
            // Exception occurred, deal with it
            ex = e;
        } catch (InterruptedException e) {
            // Shouldn't happen, we're invoked when computation is finished
            throw new AssertionError(e);
        }
    }

    public boolean isActive(){
        return !this.isDone() && !this.isCancelled();
    }

    @Override
    protected boolean runAndReset(){
        this.ex = null;
        return super.runAndReset();
    }

    @Override
    public void run(){
        this.ex = null;
        super.run();
    }

    public Exception retrieveExeption(){
        return ex;
    }

    public MyFutureTask(Runnable runnable, A result) {
        super(runnable, result);
    }

}

它仍然有很多代码,但您可以再次重用 MyFutureTask。

如何在 FutureTask 中捕获异常中描述了这种异常处理方式

于 2015-12-10T18:54:04.653 回答