我正在做一个项目,我将在其中拥有不同的捆绑包。让我们举个例子,假设我有 5 个 Bundles,每个包都有一个方法 name process
。
现在,我正在process
依次调用所有这 5 个捆绑包的方法,然后我正在写入数据库。但这就是我不想要的。
以下是我正在寻找的东西-
- 我需要
process
使用多线程代码并行调用所有这 5 个 Bundles 方法,然后写入数据库。我不确定这样做的正确方法是什么?我应该有五个线程吗?每个捆绑一个线程?但是在那种情况下会发生什么,假设如果我有 50 个包,那么我将有 50 个线程? - 而且,我也想有超时功能。如果任何捆绑包花费的时间超过了我们设置的阈值,那么它应该超时并记录为该捆绑包花费了很多时间的错误。
我希望这个问题足够清楚。
下面是我到目前为止的代码,它process
依次一个接一个地调用每个包的方法。
public void callBundles(final Map<String, Object> eventData) {
final Map<String, String> outputs = (Map<String, String>)eventData.get(Constants.HOLDER);
for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
// calling the process method of a bundle
final Map<String, String> response = entry.getPlugin().process(outputs);
// then write to the database.
System.out.println(response);
}
}
我不确定最好和最有效的方法是什么?而且我不想按顺序写。因为,在未来,我可能会拥有超过 5 个捆绑包。
谁能给我一个例子来说明我该怎么做?我已经尝试过这样做,但不知何故这不是我想要的方式。
任何帮助将不胜感激。谢谢。
更新:-
这就是我想出的——
public void callBundles(final Map<String, Object> eventData) {
// Three threads: one thread for the database writer, five threads for the plugin processors
final ExecutorService executor = Executors.newFixedThreadPool(5);
final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();
@SuppressWarnings("unchecked")
final Map<String, String> outputs = (Map<String, String>)eventData.get(Constants.EVENT_HOLDER);
for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
executor.submit(new Runnable () {
public void run() {
final Map<String, String> response = entry.getPlugin().process(outputs);
// put the response map in the queue for the database to read
queue.offer(response);
}
});
}
Future<?> future = executor.submit(new Runnable () {
public void run() {
Map<String, String> map;
try {
while(true) {
// blocks until a map is available in the queue, or until interrupted
map = queue.take();
// write map to database
System.out.println(map);
}
} catch (InterruptedException ex) {
// IF we're catching InterruptedException then this means that future.cancel(true)
// was called, which means that the plugin processors are finished;
// process the rest of the queue and then exit
while((map = queue.poll()) != null) {
// write map to database
System.out.println(map);
}
}
}
});
// this interrupts the database thread, which sends it into its catch block
// where it processes the rest of the queue and exits
future.cancel(true); // interrupt database thread
// wait for the threads to finish
try {
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
//log error here
}
}
但是我还不能在其中添加任何超时功能。而且如果我按原样运行上面的代码,那么它也没有运行。我错过了什么吗?
有人可以帮我吗?