我正在做一个项目,我将在其中拥有不同的捆绑包。让我们举个例子,假设我有 5 个 Bundles,每个包都有一个方法 name process
。
以下是我应该做的事情-
- 我需要
process
使用多线程代码并行调用所有这 5 个 Bundles 方法,然后写入数据库。我不确定这样做的正确方法是什么?我应该有五个线程吗?每个捆绑一个线程?但是在那种情况下会发生什么,假设如果我有 50 个包,那么我将有 50 个线程? - 而且,我也想有超时功能。如果任何捆绑包花费的时间超过了我们设置的阈值,那么它应该超时并记录为该捆绑包花费了很多时间的错误。
我所做的以下尝试很可能是有缺陷的,并且错误处理绝不是完整的。但不知何故,我总是在这一行得到一个错误-
pool.invokeAll
错误是-
The method invokeAll(Collection<? extends Callable<T>>, long, TimeUnit) in the type ExecutorService is not applicable for the arguments (List<ModelFramework.ProcessBundleHolderEntry>, int, TimeUnit)
下面是我的方法,它将process method
以多线程方式调用所有包。
public void processEvents(final Map<String, Object> eventData) {
ExecutorService pool = Executors.newFixedThreadPool(5);
List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();
Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);
for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
entries.add(processBundleHolderEntry);
}
try {
// somehow I always get an error at invokeAll method. Is there anything wrong?
List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
for (int i = 0; i < futures.size(); i++) {
// This works since the list of future objects are in the
// same sequential order as the list of entries
Future<Object> future = futures.get(i);
ProcessBundleHolderEntry entry = entries.get(i);
if (!future.isDone()) {
// log error for this entry
}
}
} catch (InterruptedException e) {
// handle this exception!
}
}
其次,我在 ModelFramework 类中添加的线程的 Callable 实现
public class ProcessBundleHolderEntry implements Callable {
private BundleRegistration.BundlesHolderEntry entry;
private Map<String, String> outputs;
public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
this.entry = entry;
this.outputs = outputs;
}
public Object call() throws Exception {
final Map<String, String> response = entry.getPlugin().process(outputs);
// write to the database.
System.out.println(response);
return response;
}
}
谁能帮我解决我遇到的错误?还有谁能告诉我上述方法是否有任何问题,或者有没有更好更有效的方法来做同样的事情?
任何帮助将不胜感激。