我有一个process
在我的两个类中命名的方法,可以说CLASS-A and CLASS-B
。现在在下面的循环中,我process method
依次调用我的两个类,意思是一个一个,它工作正常,但这不是我正在寻找的方式。
for (ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
final Map<String, String> response = entry.getPlugin().process(outputs);
// write to database
System.out.println(response);
}
有什么办法,我可以用多线程的方式调用我的两个类的 process 方法。这意味着一个线程将调用 CLASS-A 的处理方法,第二个线程将调用 CLASS-B 的处理方法。
然后在那之后,我考虑将方法返回的数据写入process
数据库。所以我可以再多一个线程来写入数据库。
下面是我以多线程方式提出的代码,但不知何故它根本没有运行。
public void writeEvents(final Map<String, Object> data) {
// Three threads: one thread for the database writer, two threads for the plugin processors
final ExecutorService executor = Executors.newFixedThreadPool(3);
final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();
@SuppressWarnings("unchecked")
final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER);
for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
executor.submit(new Runnable () {
public void run() {
final Map<String, String> response = entry.getPlugin().process(outputs);
// put the response map in the queue for the database to read
queue.offer(response);
}
});
}
Future<?> future = executor.submit(new Runnable () {
public void run() {
Map<String, String> map;
try {
while(true) {
// blocks until a map is available in the queue, or until interrupted
map = queue.take();
// write map to database
System.out.println(map);
}
} catch (InterruptedException ex) {
// IF we're catching InterruptedException then this means that future.cancel(true)
// was called, which means that the plugin processors are finished;
// process the rest of the queue and then exit
while((map = queue.poll()) != null) {
// write map to database
System.out.println(map);
}
}
}
});
// this interrupts the database thread, which sends it into its catch block
// where it processes the rest of the queue and exits
future.cancel(true); // interrupt database thread
// wait for the threads to finish
try {
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
//log error here
}
}
但是如果我删除最后一行executor.awaitTermination(5, TimeUnit.MINUTES);
然后它开始运行良好,一段时间后,我总是得到这样的错误 -
JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.
JVMDUMP032I JVM requested Heap dump using 'S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd' in response to an event
JVMDUMP010I Heap dump written to S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd
JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.
任何人都可以帮助我找出问题所在以及我在上面的代码中做错了什么吗?如果我按顺序运行,那么我不会收到任何错误并且工作正常。
与我正在做的事情相比,还有什么更好的方法吗?因为将来我可以拥有多个插件处理器,而不是两个。
我想做的是-以多线程方式调用我的两个类的进程方法,然后写入数据库,因为我的进程方法将返回一个 Map。
任何帮助将不胜感激。如果可能的话,我正在寻找一个可行的例子。谢谢您的帮助,