1

这是我第一次尝试创建自己的线程池。该程序有效,除非我尝试停止所有线程。我试图中断所有线程,但由于某种原因它永远不会到达这个段:

if (this.currentThread().isInterrupted())

这是输出:

添加了所有任务
线程 0 收到任务 5
这是任务 #5
线程 2 收到任务 4
这是任务 #4
线程 1 收到任务 3
这是任务 #3
全部完成,线程池关闭
任务 5 现在正在由线程 0 执行
任务2 由线程 0 接收
这是任务 #2
任务 3 现在正在由线程 1 执行
任务 1 由线程 1 接收
这是任务 #1
任务 4 现在正在由线程 2 执行
没有可用的任务...
任务 1 现在正在由线程执行1
没有可用的任务...
任务 2 现在由线程执行 0
没有可用的任务...

该程序应该在调用 stopPool() 方法后停止生成输出。

谁能告诉我这里出了什么问题?

public class CustomThread extends Thread
{
            private int id;
            private Task task;
            private ThreadPool threadpool;

            public CustomThread(int id, Task task, ThreadPool threadpool)
            {
                this.id = id;
                this.task = task;
                this.threadpool = threadpool;
            }

            public synchronized void run()
            {
                while(true)
                {
                    if (this.currentThread().isInterrupted())
                    {
                        System.out.println("Thread " + id + " is halted");
                    }
                    else
                    {
                        if (threadpool.getTaskQueue().isEmpty())
                        {
                            System.out.println("No available tasks...");
                            try 
                            {
                                this.wait();
                            } 
                            catch (InterruptedException e) 
                            {
                                e.printStackTrace();
                            }
                        }
                        else
                        {   
                            task = threadpool.retrieveTask();
                            System.out.println("Task " + task.getID() + " recieved by thread " + id);
                            task.run();
                            task.setState(true);
                            System.out.println("Task " + task.getID() + " now being executed by thread " + id);
                        }
                    }
                }
            }

            public synchronized void wakeUp()
            {
                this.notify();
            }

            public int getThreadID()
            {
                return id;
            }

        }

    import java.util.List;
    import java.util.ArrayList;
    import java.util.Stack;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;


    public class ThreadPool
    {
        private List<CustomThread> thread_list;
        private Stack<Task> task_queue;

        public ThreadPool(int threads)
        {
            thread_list = new ArrayList<CustomThread>();
            task_queue = new Stack<Task>();
            for(int i=0; i<threads; i++)
            {
                CustomThread thread = new CustomThread(i, null, this);
                thread_list.add(thread);
                thread.start();
            }
        }

        //Must use notify() to wake up an idle thread
        public synchronized void add(Task task)
        {
            task_queue.push(task);
            try
            {
                for(CustomThread t : thread_list)
                {
                    t.wakeUp();
                }

            }
            catch (Exception e)
            {
                e.printStackTrace();
            }

        }

        public synchronized void stopPool()
        {
            for(CustomThread t : thread_list)
            {
                t.currentThread().interrupt();
            }

        }

        public synchronized Stack getTaskQueue()
        {
            return task_queue;
        }

        public synchronized Task retrieveTask()
        {
            return task_queue.pop();
        }
    }

public class Task implements Runnable
{
    private int id;
    private boolean finished = false;
    public Task(int id)
    {
        this.id = id;
    }

    public synchronized void run()
    {
            System.out.println("This is task #" + id);

            try
            {
                Thread.sleep(1000);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
    }

    public int getID()
    {
        return id;
    }

    public void setState(Boolean finished)
    {
        this.finished = finished;
    }

    public boolean getState()
    {
        return finished;
    }
}


public class Init 
{
    public static void main(String[] args)
    {
        ThreadPool threadpool = new ThreadPool(3);
        threadpool.add(new Task(1));
        threadpool.add(new Task(2));
        threadpool.add(new Task(3));
        threadpool.add(new Task(4));
        threadpool.add(new Task(5));
        System.out.println("All tasks added");
        {
            try
            {
                Thread.sleep(1000);
            }
            catch(Exception e)
            {
                e.printStackTrace();
            }
        }
        threadpool.stopPool();
        System.out.println("All done, threadpool closed");
    }

}
4

1 回答 1

3

你的代码

t.currentThread().interrupt();

会反复中断当前线程,而不是线程t尝试

t.interrupt();

顺便说一句:我希望wait()抛出一个异常,您记录并继续,就好像它没有发生一样。


为了您的兴趣,这是您可以使用 ExecutorService 编写它的方式。

ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 5; i++) {
    final int id = i;
    service.submit(new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            System.out.println("This is task #" + id);
            Thread.sleep(1000);
            return null;
        }
    });
}
System.out.println("All tasks added");
service.shutdown();
service.awaitTermination(1, TimeUnit.HOURS);
System.out.println("All done, threadpool closed");

印刷

This is task #1
This is task #3
This is task #2
All tasks added
This is task #4
This is task #5
All done, threadpool closed
于 2012-10-02T10:54:30.033 回答