2

以下SSCCE显示从服务器(例如)获取记录。ExecutorService用于创建ThreadPool2 个线程,我用Timeout3 秒调用了所有这些线程。我故意让一些任务失败。

现在我的问题是,如何获取失败任务的 EmpID?

主类:

public class MultiThreadEx {
    public static void main(String[] args) {

        String[] empIDArray = {
                "100", "200", "300", "400", "500", 
                "600", "700", "800", "900", "1000", 
                "1100", "1200", "1300", "1400", "1500"
            };

        List<ThreadTaskEach> taskList = new ArrayList<ThreadTaskEach>();
        try {
            for(String empID : empIDArray) {
                taskList.add(new ThreadTaskEach(empID));
            }
        } catch(Exception e) {
            System.out.println("Exception occured: " + e.getMessage());
        }

        List<Future<Map<String, String>>> futureList = null;
        try {
            ExecutorService service = Executors.newFixedThreadPool(2);
            futureList = service.invokeAll(taskList, 3, TimeUnit.SECONDS);
            service.shutdown();
        } catch(InterruptedException ie) {
            System.out.println("Exception occured: " + ie.getMessage());
        }

        for(Future<Map<String, String>> future : futureList) {
            try {
                Map<String, String> resultMap = future.get();

                for(String key : resultMap.keySet()) {
                    System.out.println(resultMap.get(key));
                }

            } catch(ExecutionException ee) {
                System.out.println("Exception occured: " + ee.getMessage());
            } catch (InterruptedException ie) {
                System.out.println("Exception occured: " + ie.getMessage());
            } catch(CancellationException e) {
                System.out.println("Exception occured: " + e.getMessage());
            }
        }

    }
}

线程类

class ThreadTaskEach implements Callable<Map<String, String>>{

    private String empID;

    public ThreadTaskEach(String empID) {
        this.empID = empID;
    }

    @Override
    public Map<String, String> call() throws Exception {
        try {
            return prepareMap(empID);
        } catch(Exception e) {
            System.out.println("Exception occured: " + e.getMessage());
            throw new Exception("Exception occured: " + e.getMessage());
        }
    }

    private Map<String, String> prepareMap(String empID) throws InterruptedException {
        Map<String, String> map = new HashMap<String, String>();

        if(Integer.parseInt(empID) % 500 == 0) {
            Thread.sleep(5000);
        }

        map.put(empID, empID + ": " + Thread.currentThread().getId());

        return map;
    }
}

在上面的代码 500, 1000 .. 未能在 3 秒内完成。

Map<String, String> resultMap = future.get();

因此,当我为这些任务说 future.get() 时,我得到了CancellationException. 但是如何从任务中获取 EmpID 呢?

4

1 回答 1

6

除了使用invokeAll,您可以一个一个地submit完成每个任务并将每个未来存储在地图中:

Future<?> f = executor.submit(task);
map.put(f, task.getId());

现在当你尝试获取时,如果出现异常,你可以使用地图返回到id。但是,您需要为每个设置超时,future.get()这对于您的用例可能不实用。

另一种方法是使用规范invokeAll which 保证以与提交的任务相同的顺序返回期货

返回代表任务的 Futures 列表,其顺序与给定任务列表的迭代器生成的顺序相同

只要你使用一个List,迭代顺序是固定的,你可以匹配两个列表:

    for (int i = 0; i < futureList.size(); i++) {
        Future<Map<String, String>> future = futureList.get(i)
        try {
            Map<String, String> resultMap = future.get();

            for(String key : resultMap.keySet()) {
                System.out.println(resultMap.get(key));
            }
        } catch(ExecutionException ee) {
            System.out.println("Exception in task " + taskList.get(i).getId());
        }
    }

只要确保您使用具有稳定迭代顺序的集合(ArrayList 很好)。

于 2013-06-12T18:53:09.633 回答