1

我有一个非常基本的线程池代码。它调用一个位于linkedblockingqueue 中的工作对象池。该代码只是通过重新循环工作对象来打印出输入数据。

我发现以下一致的死锁/冻结:

public class throttleheapthreadpool{
            private quoteworkerobject[] channels;
            private LinkedBlockingQueue<quoteworkerobject> idlechannels;
            public throttleheapthreadpool(int poolsize,int stocks){
        channels=new quoteworkerobject[poolsize];
        idlechannels=new LinkedBlockingQueue<quoteworkerobject>();

        for(int i=1;i<poolsize;i++){
            channels[i]=new quoteworkerobject(idlechannels);
            idlechannels.add(channels[i]);//All WORKERS to Idle pool to start
        }
    }

    public void execute(Integer quote){
        quoteworkerobject current = null;
        try {
                        //extract worker from pool
            current = (quoteworkerobject)idlechannels.take();
            current.put(quote);
        } catch (InterruptedException e) {
        }
    }

    class quoteworkerobject{
        LinkedBlockingQueue<Integer> taskqueue=new LinkedBlockingQueue<Integer>(); 
        Thread quotethread=null;
        LinkedBlockingQueue<quoteworkerobject> idle=null;
        @SuppressWarnings("unchecked")
        public quoteworkerobject(LinkedBlockingQueue<quoteworkerobject> idlechannels){
            this.idle=idlechannels;
            Runnable r=new Runnable(){
                public void run() {
                    insertquote();
                }
            };
            quotethread=new Thread(r);
            quotethread.start();//spawn a thread from the worker 
        }
        public void put(Integer  quote){
            taskqueue.add(quote);
        }
        public void insertquote(){
            try{
                Integer thisquote=taskqueue.take();
                idle.add(this);
            }
            catch(Exception ex){
            }

        }

    }

    public static void main(String[] args){
        throttleheapthreadpool pool=new throttleheapthreadpool(5,200);
        Random randomGenerator = new Random();

        for(int node=0;node < 20;node++){
            int d=randomGenerator.nextInt(5*200);
            pool.execute(d);
        }
    }

}   

此代码在第 8 次执行时始终冻结 - 在点 current = (quoteworkerobject)idlechannels.take();

上面有什么问题?

4

2 回答 2

3

这正是我(讨厌?)不喜欢使用这样的代码的原因。您应该考虑让您/我们的生活更轻松,并编写即使在几个月后您也可以查看并证明它的代码:相应地命名您的变量,编写简短的文档或解释等。我花了 25 分钟来重构,因为我不明白发生了什么。

我添加了一个小的重构,我还添加了一些断点,看看代码 - 解释在里面。但问题在于 insertQuote 方法——它完成得太早了。

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;

public class Pool {
private Worker[] workers;
private LinkedBlockingQueue<Worker> workerQueue;



/**
 * Create a pool of 5 workers and a {@link LinkedBlockingQueue} to store them
 */
public Pool(int poolsize) {
    //1. First you get here : you create a Pool of 5 Worker Threads and a Queue to store them
    System.out.println("1.");
    workers = new Worker[poolsize];
    workerQueue = new LinkedBlockingQueue<Worker>();

    for (int i = 0; i < poolsize; i++) {
        //2. You instantiate 5 worker Threads and place each of them on the Queue
        System.out.println("2.");
        workers[i] = new Worker(workerQueue);
        workerQueue.add(workers[i]);
    }
}

public void execute(Integer quote) {
    Worker current = null;
    try {
        // extract worker from pool
        //6. Get a worker from the Queue
        System.out.println("6.");
        current = workerQueue.take();
        current.put(quote);
    } catch (InterruptedException e) {
    }
}


/**
 * 
 *
 */
class Worker {
    LinkedBlockingQueue<Integer> taskqueueForEachWorker = new LinkedBlockingQueue<Integer>();
    LinkedBlockingQueue<Worker> workerQueue = null;

    public Worker(LinkedBlockingQueue<Worker> idlechannels) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                //3. You call the insert quote method
                System.out.println("3.");
                insertquote();
            }
        }).start();
    }

    public void put(Integer quote) {
        //7. Add a task for each Thread to do
        System.out.println("7.");
        taskqueueForEachWorker.add(quote);
    }

    //TODO The problem is here: After you execute this line : workerQueue.add(this); this method ends, NO MORE worker Threads are put on the queue,
    // thus at point 6 you block, well because there are no more worker Threads an no one add them.

    public void insertquote() {
        try {
            // 4. You try to take an Integer from the Pool of tasks from rach Thread, but there is nothing yet - it is empty, thus each Thread (worker)
            // blocks here, waiting for a task
            System.out.println("4.");
            Integer thisquote = taskqueueForEachWorker.take(); // This will successed only after 7.
            workerQueue.add(this);
        } catch (Exception ex) {
        }
    }
}

public static void main(String[] args) {
    Pool pool = new Pool(5);
    Random randomGenerator = new Random();

    for (int node = 0; node < 20; node++) {
        int d = randomGenerator.nextInt(5 * 200);
        System.out.println("5.");
        pool.execute(d);
    }
}

}

输出将是 1. 2. 3. 4. 2. 3. 4. 2. 3. 4. 2. 3. 4. 2. 3. 4. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5. 6.

看到最后一行是 6。如果这里阻塞,因为方法 insertQuote 已退出,因此队列现在为空,则所有工作线程都已被占用。

在我看来,因为您的工作线程每个都使用一个单独的队列,所以您应该实现“工作窃取”模式或Deque。也调查一下。

于 2012-08-07T08:40:18.563 回答
1

请参阅下面我的重构(仍然不完美,但更具可读性)。您的问题如下:

  • 您创建 4 个工人(在 的构造函数中从 1 循环到 5 throttleheapthreadpool
  • 每个工作人员在单独的线程中运行insertquote 一次并返回空闲池

因此,总体而言,您提交了 4 个已完成的作业,并且工作人员返回队列,然后您再给他们 4 个作业(总共 8 个),除了他们不使用该作业,因为他们的insertquote方法已经退出。

解决方案:insertquote在while循环中运行:

public void insertquote() {
    try {
        while (true) {
            taskqueue.take();
            idle.add(this);
        }
    } catch (Exception ex) {
    }
}

有关信息,这是我当前版本的代码:

public class ThrottleHeapThreadPool {

    private final BlockingQueue<QuoteWorkerObject> idlechannels = new LinkedBlockingQueue<QuoteWorkerObject>();

    public static void main(String[] args) {
        ThrottleHeapThreadPool pool = new ThrottleHeapThreadPool(5, 200);
        Random randomGenerator = new Random();

        for (int node = 0; node < 20; node++) {
            int d = randomGenerator.nextInt(5 * 200);
            pool.execute(d);
        }
    }

    public ThrottleHeapThreadPool(int poolsize, int stocks) {

        for (int i = 1; i < poolsize; i++) {
            QuoteWorkerObject worker = new QuoteWorkerObject(idlechannels);
            idlechannels.add(worker);//All WORKERS to Idle pool to start
            worker.init();
        }
    }

    public void execute(Integer quote) {
        try {
            //extract worker from pool
            QuoteWorkerObject worker = idlechannels.take();
            worker.put(quote);
        } catch (InterruptedException e) {
        }
    }

    class QuoteWorkerObject {

        private final BlockingQueue<Integer> taskqueue = new LinkedBlockingQueue<Integer>();
        private final BlockingQueue<QuoteWorkerObject> idle;

        @SuppressWarnings("unchecked")
        public QuoteWorkerObject(BlockingQueue<QuoteWorkerObject> idlechannels) {
            this.idle = idlechannels;
        }

        public void init() {
            new Thread(new Runnable() {
                public void run() {
                    insertquote();
                }
            }).start();
        }

        public void put(Integer quote) {
            taskqueue.add(quote);
        }

        public void insertquote() {
            try {
                while (true) {
                    taskqueue.take();
                    idle.add(this);
                }
            } catch (Exception ex) {
            }
        }
    }
}
于 2012-08-06T17:39:00.033 回答