2

我需要在多核机器上处理大量(>1 亿)请求(每个请求是处理数据文件中的一行,并且涉及与远程系统的一些 I/O。虽然细节并不重要,具体任务是从一些数据文件中加载分布式Hazelcast地图)。执行将通过ThreadPoolExecutor处理。一个线程将读取文件,然后将数据提交给多个独立线程以将其放入地图中。该机器有 32 个内核,因此有足够的可用于并行加载地图。

由于请求量很大,创建任务并将它们排队到执行器服务的常用方法是不可行的,因为排队的任务会占用过多的内存。

这带来了ExecutorCompletionService。有了它,当先前的操作完成时将提交一个任务,这通过调用take()(或poll(),如果适用)知道。当执行器服务的所有线程都被使用时,这将正常工作。但是,“加载所有线程”还没有完成。有两个阶段:

  • 填满队列:当池中仍有未使用的线程时,将任务提交给 ExecutorCompletionService 并且不要等到提交更多

  • 喂入队列:一旦线程全部使用完毕,只有在前一个任务完成后才提交任务。因此,行将尽可能快地送入,但不会更快,也不会排队。

上面可以编码,但我想知道上面的逻辑是否已经实现,我不知何故错过了它。我问是因为它看起来是一个常见的场景。

4

2 回答 2

4

您可以BlockingQueue在创建ThreadPoolExecutor. 如果您要避免的只是创建过多的Runnable对象,那么您可以使用 bounded BlockingQueue,例如ArrayBlockingQueue让一个线程将项目推送到队列中,当队列满负荷时该线程将被阻塞。

于 2014-05-29T04:05:19.407 回答
-1

如果我理解您的要求,(如果我错了,请纠正我)然后您需要一种机制,其中有多个任务并且您需要最大n任务并行执行,其他任务应该在队列中等待,但是一旦您提交任务然后你不想闲逛或让线程提交任务忙,它可以继续它的工作

对于我们使用LinkedBlockingQueue和的混合的相同场景Thread,我相信一个简单的函数可以帮助你理解,

private final LinkedBlockingQueue<YourTaskObjType> EnqueuedTasks;

private void initTasksProcessingThreads(int numberOfThreads) 
{
    EnqueuedTasks= new LinkedBlockingQueue<YourTaskObjType>();
    for (int i = 0; i < numberOfThreads; i++) 
    {
        // each thread will run forever and process incoming
        //Change requests
        Thread worker = new Thread(new Runnable() 
        {               
            public void run() 
            {
                while (true) 
                {
                    try 
                    {   
                        YourTaskObjType task = EnqueuedTasks.take(); //This will wait infinitely until tasks are available
                        PerformTask(task); //Your function which will perform the task operation
                    } 
                    catch (InterruptedException e) 
                    {                                                                
                        Thread.currentThread().interrupt();
                        return;
                    } 
                    catch(Exception e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        });         
        worker.start();
    }
}

然后您可以使用一个简单的函数将任务添加到LinkedBlockingQueue,

public void AddTask(YourTaskObjType TaskObj)
{
    EnqueuedTasks.put(TaskObj);                         
}
于 2014-05-29T04:53:23.427 回答