0

我一直在尝试使用 Active Objects 进行线程池的简单实现。

这是我的主要内容:

public static void main(String[] args){
   MyThreadPool tp = new MyThreadPool(100,3);
        tp.execute(()->{
            try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
            System.out.println("42");
        });
        tp.shutDown();
}

shutdown 方法通常首先通过 Main 调用,因此会不必要地保持活动对象“活动”,但有时我会得到想要的结果。知道为什么结果存在不确定性吗?

您可以在下面看到其余的课程

public class MyThreadPool {

    ArrayBlockingQueue<Runnable> q;
    ArrayBlockingQueue<ActiveObject> activeObjects;
    volatile boolean stop;
    AtomicInteger count;
    Thread t;
    Runnable stopTask;

    public MyThreadPool(int capacity, int maxThreads) {
        activeObjects = new ArrayBlockingQueue<>(maxThreads);
        q = new ArrayBlockingQueue<>(capacity);
        count = new AtomicInteger(0);
        stopTask = ()->stop = true;

        t=new Thread(()->{
            //System.out.println("Thread-Pool Started");
            while(!stop){
                //if queue is empty it is gonna be a blocking call
                try {
                    Runnable task = q.take();
                    if(task==stopTask)
                        stopTask.run();
                    else
                        //size() is atomic integer
                        if (count.get() < maxThreads) {
                            ActiveObject a = new ActiveObject(capacity);
                            activeObjects.put(a);
                            count.incrementAndGet();
                            a.execute(task);
                        }
                        //we will assign the next task to the least busy ActiveObject
                        else {
                            int minSize = Integer.MAX_VALUE;
                            ActiveObject choice = null;
                            for (ActiveObject a : activeObjects) {
                                if (a.size() < minSize) {
                                    minSize = a.size();
                                    choice = a;
                                }
                            }
                            choice.execute(task);
                        }

                } catch (InterruptedException e) { }
            }
            //System.out.println("Thread-Pool Ended");
        });
       t.start();
    }

    //execute returns right away - just puts into the queue
    public void execute(Runnable r ){
        // if capacity is full it is gonna be a blocking call
        if(!stop)
            try { q.put(r); } catch (InterruptedException e) { }
    }

    public void shutDownNow(){
        activeObjects.forEach(a->a.shutDownNow());
        stop = true;
        t.interrupt();
    }
    public void shutDown(){
        activeObjects.forEach(a->a.shutDown());
        execute(stopTask);
    }
public class ActiveObject {

    ArrayBlockingQueue<Runnable> q;
    volatile boolean stop;
    Thread t;

    public ActiveObject(int capacity) {
        q = new ArrayBlockingQueue<>(capacity);
        t=new Thread(()->{
            //System.out.println("Active Object Started");
            while(!stop){
                //if queue is empty it is gonna be a blocking call
                try {
                    q.take().run();
                } catch (InterruptedException e) { }
            }
            //System.out.println("Active Object Ended");
        });

       t.start();
    }

    //execute returns right away - just puts into the queue
    public void execute(Runnable r ){
        // if capacity is full it is gonna be a blocking call
        if(!stop)
            try { q.put(r); } catch (InterruptedException e) { }
    }

    public void shutDownNow(){
        stop = true;
        t.interrupt();
    }
    public void shutDown(){
        execute(()->stop=true);
    }

    public int size(){
        return q.size();
    }
}
4

1 回答 1

1

在您的 main 方法中,您创建一个线程池(它也创建和启动tp.t线程),将任务排入队列tp.q,然后调用tp.shutDown()

MyThreadPool tp = new MyThreadPool(100, 3);
tp.execute(() -> {...});
tp.shutDown();

想象一下tp.shutDown()在主线程中执行MyThreadPool.t线程处理入队任务之前:

activeObjects.forEach(a -> a.shutDown());
execute(stopTask);

这里activeObjects是空的,你stopTask入队tp.qmain线程结束。

现在我们只有MyThreadPool.t线程,让我们看看它做了什么:

while (!stop) {
  try {
    Runnable task = q.take();
    if (task == stopTask)
      stopTask.run();
    else
    if (count.get() < maxThreads) {
      ActiveObject a = new ActiveObject(capacity);
      activeObjects.put(a);
      count.incrementAndGet();
      a.execute(task);
    }
    else {
      ...
    }
  } catch (InterruptedException e) {
  }
}

此时q包含 2 个任务:一个普通任务和stopTask.

在第一次循环迭代中,正常任务取自q,并用于处理新创建的ActiveObject

ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);

new ActiveObject()还创建并启动自己的内部ActiveObject.t线程。

第二个循环迭代过程stopTask

if (task == stopTask)
  stopTask.run();

设置stop = true
结果,下一个检查while (!stop)返回false并且MyThreadPool.t线程完成。

现在我们只有ActiveObject.t线程,它还没有停止:

while (!stop) {
  try {
    q.take().run();
  } catch (InterruptedException e) {
  }
} 

在这里,线程将q.take()永远等待。

于 2021-06-30T02:31:16.120 回答