实现你想要的并不难,但你应该意识到,并发和超时都会增加复杂性,尤其是在错误处理方面。
例如,发生超时时正在运行的线程可能会在超时后继续运行。只有通过处理中断信号进行合作的表现良好的线程才能在处理过程中成功停止。
您还必须确保可以并行处理各个捆绑条目,即它们是线程安全的。如果他们在处理过程中修改了某些共享资源,那么您可能会因此得到奇怪的错误。
我还想知道您是否想将数据库写入每个线程。如果是这样,您将需要在写入数据库时处理中断;例如通过回滚事务。
无论如何,要获得线程池和所有线程的总超时,您可以使用ExecutorService
(例如)固定池大小并使用该invokeAll
方法调用所有线程。
下面的尝试很可能是有缺陷的,并且错误处理绝不是完整的,但它应该为您提供一个起点。
首先,为您的线程实现 Callable:
public class ProcessBundleHolderEntry implements Callable {
private BundleRegistration.BundlesHolderEntry entry;
private Map<String, String> outputs;
public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
this.entry = entry;
this.outputs = outputs;
}
public Object call() throws Exception {
final Map<String, String> response = entry.getPlugin().process(outputs);
// write to the database.
System.out.println(response);
return response;
}
}
现在,修改后的processEvents
方法:
public void processEvents(final Map<String, Object> eventData) {
ExecutorService pool = Executors.newFixedThreadPool(5);
List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();
Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);
for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
entries.add(processBundleHolderEntry);
}
try {
List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
for (int i = 0; i < futures.size(); i++) {
// This works since the list of future objects are in the
// same sequential order as the list of entries
Future<Object> future = futures.get(i);
ProcessBundleHolderEntry entry = entries.get(i);
if (!future.isDone()) {
// log error for this entry
}
}
} catch (InterruptedException e) {
// handle this exception!
}
}