0

我正在做一个项目,我将在其中拥有不同的捆绑包。让我们举个例子,假设我有 5 个 Bundles,每个包都有一个方法 name process

以下是我应该做的事情-

  1. 我需要process使用多线程代码并行调用所有这 5 个 Bundles 方法,然后写入数据库。我不确定这样做的正确方法是什么?我应该有五个线程吗?每个捆绑一个线程?但是在那种情况下会发生什么,假设如果我有 50 个包,那么我将有 50 个线程?
  2. 而且,我也想有超时功能。如果任何捆绑包花费的时间超过了我们设置的阈值,那么它应该超时并记录为该捆绑包花费了很多时间的错误。

我所做的以下尝试很可能是有缺陷的,并且错误处理绝不是完整的。但不知何故,我总是在这一行得到一个错误-

pool.invokeAll

错误是-

The method invokeAll(Collection<? extends Callable<T>>, long, TimeUnit) in the type ExecutorService is not applicable for the arguments (List<ModelFramework.ProcessBundleHolderEntry>, int, TimeUnit)

下面是我的方法,它将process method以多线程方式调用所有包。

public void processEvents(final Map<String, Object> eventData) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);

    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
        entries.add(processBundleHolderEntry);
    }

    try {

        // somehow I always get an error at invokeAll method. Is there anything wrong?
        List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
        for (int i = 0; i < futures.size(); i++) {
            // This works since the list of future objects are in the
            // same sequential order as the list of entries
            Future<Object> future = futures.get(i);
            ProcessBundleHolderEntry entry = entries.get(i);
            if (!future.isDone()) {
                // log error for this entry
            }
        }
    } catch (InterruptedException e) {
        // handle this exception!
    }
}

其次,我在 ModelFramework 类中添加的线程的 Callable 实现

public class ProcessBundleHolderEntry implements Callable {
    private BundleRegistration.BundlesHolderEntry entry;
    private Map<String, String> outputs;

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
        this.entry = entry;
        this.outputs = outputs;
    }

    public Object call() throws Exception {
        final Map<String, String> response = entry.getPlugin().process(outputs);
        // write to the database.
        System.out.println(response);
        return response;
    }
}

谁能帮我解决我遇到的错误?还有谁能告诉我上述方法是否有任何问题,或者有没有更好更有效的方法来做同样的事情?

任何帮助将不胜感激。

4

2 回答 2

1

我所做的以下尝试很可能是有缺陷的,并且错误处理绝不是完整的。但不知何故,我总是在这一行得到一个错误-

我认为问题是ProcessBundleHolderEntry如果Callable<Object>你想invokeAll(...)返回一个List<Future<Object>>. 我刚刚编译了您的代码,这解决了问题。

真的,在我看来它应该实施Callable<Map<String, String>>。然后调用方法应该返回正确的类型:

public Map<String, String> call() throws Exception {

然后该invokeAll(...)方法将返回 right List<Future<Map<String, String>>

一个不同的(虽然有点奇怪)的想法是thiscall()方法中返回。在从. ProcessBundleHolderEntry implements Callable<ProcessBundleHolderEntry>_ 然后,您无需对条目执行 a 来匹配它。然后,您将条目、输出和响应放在一个对象中。thiscall()get(i)

于 2013-09-05T20:10:14.273 回答
1

没关系,我发现了实际问题

public class ProcessBundleHolderEntry implements Callable {

应定义为

public class ProcessBundleHolderEntry implements Callable<Object> {

为了匹配Object以下声明中的类型

List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);

invokeAll 方法的签名是

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
于 2013-09-05T20:00:48.713 回答