2

我有一个process在我的两个类中命名的方法,可以说CLASS-A and CLASS-B。现在在下面的循环中,我process method依次调用我的两个类,意思是一个一个,它工作正常,但这不是我正在寻找的方式。

for (ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
    final Map<String, String> response = entry.getPlugin().process(outputs);

    // write to database
    System.out.println(response);
}

有什么办法,我可以用多线程的方式调用我的两个类的 process 方法。这意味着一个线程将调用 CLASS-A 的处理方法,第二个线程将调用 CLASS-B 的处理方法。

然后在那之后,我考虑将方法返回的数据写入process数据库。所以我可以再多一个线程来写入数据库。

下面是我以多线程方式提出的代码,但不知何故它根本没有运行。

public void writeEvents(final Map<String, Object> data) {

    // Three threads: one thread for the database writer, two threads for the plugin processors
    final ExecutorService executor = Executors.newFixedThreadPool(3);

    final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();

    @SuppressWarnings("unchecked")
    final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER);

    for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
        executor.submit(new Runnable () {
            public void run() {
                final Map<String, String> response = entry.getPlugin().process(outputs);
                // put the response map in the queue for the database to read
                queue.offer(response);
            }
        });
    }

    Future<?> future = executor.submit(new Runnable () {
        public void run() {
            Map<String, String> map;
            try {
                while(true) {
                    // blocks until a map is available in the queue, or until interrupted
                    map = queue.take();
                    // write map to database
                    System.out.println(map);
                }
            } catch (InterruptedException ex) {
                // IF we're catching InterruptedException then this means that future.cancel(true)
                // was called, which means that the plugin processors are finished;
                // process the rest of the queue and then exit
                while((map = queue.poll()) != null) {
                    // write map to database
                    System.out.println(map);
                }
            }
        }
    });

    // this interrupts the database thread, which sends it into its catch block
    // where it processes the rest of the queue and exits
    future.cancel(true); // interrupt database thread

    // wait for the threads to finish
    try {
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //log error here
    }
}

但是如果我删除最后一行executor.awaitTermination(5, TimeUnit.MINUTES);然后它开始运行良好,一段时间后,我总是得到这样的错误 -

JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.
JVMDUMP032I JVM requested Heap dump using 'S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd' in response to an event
JVMDUMP010I Heap dump written to S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd
JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.

任何人都可以帮助我找出问题所在以及我在上面的代码中做错了什么吗?如果我按顺序运行,那么我不会收到任何错误并且工作正常。

与我正在做的事情相比,还有什么更好的方法吗?因为将来我可以拥有多个插件处理器,而不是两个。

我想做的是-以多线程方式调用我的两个类的进程方法,然后写入数据库,因为我的进程方法将返回一个 Map。

任何帮助将不胜感激。如果可能的话,我正在寻找一个可行的例子。谢谢您的帮助,

4

2 回答 2

1

您粘贴的代码片段几乎没有问题,如果您修复它们,这应该可以工作。
1.您正在使用无限循环从阻塞队列中获取元素并尝试使用未来来打破它。这绝对不是一个好方法。这种方法的问题是您的数据库线程可能永远不会运行,因为它可能会在调用者线程中运行的未来任务甚至在它运行之前被取消。这是容易出错的。
- 您应该运行 while 循环固定的次数(您已经知道有多少生产者或您将获得多少次响应)。

  1. 此外,提交给执行器服务的任务应该是独立的任务……这里你的数据库任务依赖于其他任务的执行……如果你的执行策略发生变化,这也可能导致死锁……例如,如果你使用单线程池执行器如果数据库线程被调度,它只会阻塞等待生产者在队列中添加数据。

    • 一个好方法是在同一个线程中创建检索数据和更新数据库的任务。
    • 或者先检索所有响应,然后并行执行数据库操作

    公共无效写事件(最终地图数据){

    final ExecutorService executor = Executors.newFixedThreadPool(3);       
    @SuppressWarnings("unchecked")
    final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER);
    
    for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
        executor.submit(new Runnable () {
            public void run() {
                try {
                    final Map<String, String> response = entry.getPlugin().process(outputs);
                    //process the response and update database.
                    System.out.println(map);
                } catch (Throwable e) {
                    //handle execption
                } finally {
                    //clean up resources
                }               
            }
        });
    }
    

    // 这将等待正在运行的线程完成..这是一个有序的关闭。executor.shutdown(); }

于 2013-08-28T05:48:56.320 回答
0

好的,这是我上面建议的评论的一些代码。免责声明:我不确定它是否有效甚至编译,或者它是否解决了问题。但我们的想法是控制取消过程,而不是依赖future.cancel我怀疑可能会导致问题的过程。

class CheckQueue implements Runnable {
    private volatile boolean cancelled = false;
    public void cancel() { cancelled = true; }
    public void run() {
        Map<String, String> map;
        try {
            while(!cancelled) {
                // blocks until a map is available in the queue, or until interrupted
                map = queue.take();
                if (cancelled) break;
                // write map to database
                System.out.println(map);
        } catch (InterruptedException e) {
        }
        while((map = queue.poll()) != null) {
            // write map to database
            System.out.println(map);
        }
    }
}

CheckQueue queueChecker = new CheckQueue ();
Future<?> future = executor.submit(queueChecker);

// this interrupts the database thread, which sends it into its catch block
// where it processes the rest of the queue and exits
queueChecker.cancel();
于 2013-08-27T22:57:47.867 回答