0

我正在开发一个项目,其中我将拥有不同的捆绑包/模型。举个例子,假设我有 4 个包,每个包都有一个方法名称process

以下是我应该做的事情-

  1. 我需要process method使用多线程并行调用所有这 4 个捆绑包,并且process method在每个捆绑包中都会返回一个地图,然后将此地图写入同一个线程中的数据库或任何最好的方法(我不确定这是正确的方法)。
  2. 而且我还想在线程级别启用某种超时功能。这意味着如果任何 Bundle 需要花费大量时间来执行,则该 Bundle 线程应该超时并记录为错误,说明该特定捆绑包超时,因为它需要很多时间。

我所做的以下尝试很可能是有缺陷的,并且错误处理绝不是完整的。任何人都可以指导我在错误处理案例中应该做什么吗?

下面是我的方法,它将process method以多线程方式调用所有包。

public void processEvents(final Map<String, Object> eventData) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);

    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
        entries.add(processBundleHolderEntry);
    }

    try {
        List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
        for (int i = 0; i < futures.size(); i++) {
            // This works since the list of future objects are in the
            // same sequential order as the list of entries
            Future<Object> future = futures.get(i);
            ProcessBundleHolderEntry entry = entries.get(i);
            if (!future.isDone()) {
                // log error for this entry
            }
        }
    } catch (InterruptedException e) {
        // handle this exception!
    }
}

其次,为您的线程实现 Callable :

public class ProcessBundleHolderEntry implements Callable {
    private BundleRegistration.BundlesHolderEntry entry;
    private Map<String, String> outputs;

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
        this.entry = entry;
        this.outputs = outputs;
    }

    public Object call() throws Exception {
        final Map<String, String> response = entry.getPlugin().process(outputs);
        // write to the database.
        System.out.println(response);
        return response;
    }
}

谁能告诉我上述方法是否有任何问题,或者有没有更好更有效的方法来做同样的事情?我不确定是否也存在任何线程安全问题。

任何帮助将不胜感激。

4

1 回答 1

1

您的代码中唯一的共享对象是eventData:只要在此方法运行时未对其进行修改(或者如果地图及其内容是线程安全的并且更改已安全发布),您应该没问题。

关于任务的异常处理,您通常会:

try {
    future.get();
} catch (ExecutionException e) {
    Throwable exceptionInFuture = e.getCause();
    //throw, log or whatever is appropriate
}

关于中断异常:这意味着您正在执行该方法的线程已被中断。你需要做什么取决于你的用例,但你通常应该停止你正在做的事情,比如:

} catch (InterruptedException e) {
    pool.shutdownNow(); //cancels the tasks
    //restore interrupted flag and exit
    Thread.currentThread.interrupt();
    //or rethrow the exception
    throw e;
}

注意:线程池的目的是重用——你应该将执行器服务声明为一个(私有最终)实例变量,而不是每次processEvents调用方法时都创建一个。

于 2013-08-31T07:22:21.230 回答