6

我正在计算等待串行事件发生超时的未来:

Future<Response> future = executor.submit(new CommunicationTask(this, request));
response = new Response("timeout");
try {
  response = future.get(timeoutMilliseconds, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
  future.cancel(true);
  log.info("Execution time out." + e);
} catch (ExecutionException e) {
  future.cancel(true);
  log.error("Encountered problem communicating with device: " + e);
}

该类CommunicationTask已实现Observer接口以侦听来自串行端口的更改。

问题是从串行端口读取相对较慢,即使发生串行事件,时间也会耗尽并TimeoutException抛出 a。当串行事件发生时,我能做些什么来停止我未来的超时时钟?

我试过了,AtomicReference但这并没有改变任何东西:

public class CommunicationTask implements Callable<Response>, Observer {
  private AtomicReference atomicResponse = new AtomicReference(new Response("timeout"));
  private CountDownLatch latch = new CountDownLatch(1);
  private SerialPort port;

  CommunicationTask(SerialCommunicator communicator, Request request) {
    this.communicator = communicator;
    this.message = request.serialize();
    this.port = communicator.getPort();
  }

  @Override
  public Response call() throws Exception {
    return query(message);
  }

  public Response query(String message) {
    communicator.getListener().addObserver(this);
    message = message + "\r\n";
    try {
      port.writeString(message);
    } catch (Exception e) {
      log.warn("Could not write to port: " + e);
      communicator.disconnect();
    }
    try {
      latch.await();
    } catch (InterruptedException e) {
      log.info("Execution time out.");
    }
    communicator.getListener().deleteObserver(this);
    return (Response)atomicResponse.get();
  }

  @Override
  public void update(Observable o, Object arg) {
    atomicResponse.set((Response)arg);
    latch.countDown();
  }
}

我能做些什么来解决这个问题?

编辑:

好的,我有一个错误。atomicResponse在设置函数之前,我正在倒数我的闩锁update。现在它似乎奏效了,但仍然存在这样的问题,如果这种方法是正确的方法吗?

4

2 回答 2

1

您是否探索过 google 的 Guava 'future listener',它基于 Async future,希望以下代码片段对您有所帮助....

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

public class SyncFutureExample {
    public static void main(String[] args) {
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
        ListenableFuture<String> lf  = service.submit(new CommuncationTask());

        //no need for future.get() or future.get(10,time minutes)


        //add callbacks(= async future listeners) ....
        Futures.addCallback(lf, new FutureCallback<String>() {
              public void onSuccess(String input) {
                System.out.println(input + " >>> success");//gets a callback once task is success
              }
              public void onFailure(Throwable thrown) {
                  System.out.println(thrown + " >>> failure");//gets a callback if task is failed
              }
            });
        service.shutdown();
    }

}

class CommuncationTask implements Callable<String>{

    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(15);// some dummy serious task .............
        return "TaskDone";
    }


}
于 2015-03-21T19:16:43.417 回答
0

希望这会有所帮助。我不会对此发表评论,希望代码中的一切都清楚。

class CommunicationTask implements Callable<String>, Observer {
    volatile boolean ignoreTimeoutException;

    public CommunicationTask(SerialCommunicator communicator, Request request) {
    }

    public String call() throws Exception {
        Thread.sleep(1000);
        return "done";
    }

    public void update(Observable o, Object arg) {
        ignoreTimeoutException = true;
    }
}

class FutureCommunicationTask extends FutureTask<String> {
    private CommunicationTask ct;

    public FutureCommunicationTask(CommunicationTask ct) {
        super(ct);
        this.ct = ct;
    }

    public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            return super.get(timeout, unit);
        } catch (TimeoutException e) {
            if (ct.ignoreTimeoutException) {
                return get();  //  no timeout wait 
            }
            throw e;
        }
    }
}

public class Test {

    public static void main(String[] args) throws Exception {
        CommunicationTask ct = new CommunicationTask(null, null);
        FutureTask<String> fct = new FutureCommunicationTask(ct);
        ExecutorService ex = Executors.newSingleThreadExecutor();
        ex.execute(fct);
//      uncomment this line and timeout will be cancelled 
        ct.update(null, null);  
        String res = fct.get(1, TimeUnit.MILLISECONDS);
        System.out.println(res);
    }
}
于 2012-12-03T16:08:25.047 回答