6

当我在我的一个项目中使用它时,我开始从 Java Doc 中阅读更多关于 ThreadPoolExecutor 的信息。那么谁能解释一下这条线的实际含义是什么?-我知道每个参数代表什么,但我想从这里的一些专家那里以更一般/外行的方式理解它。

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy());

更新:- 问题陈述是:-

每个线程使用 1 到 1000 之间的唯一 ID,并且程序必须运行 60 分钟或更长时间,因此在这 60 分钟内,所有 ID 都可能完成,因此我需要再次重用这些 ID。所以这是我使用上面的执行器编写的下面的程序。

class IdPool {
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();

    public IdPool() {
        for (int i = 1; i <= 1000; i++) {
            availableExistingIds.add(i);
        }
    }

    public synchronized Integer getExistingId() {
        return availableExistingIds.removeFirst();
    }

    public synchronized void releaseExistingId(Integer id) {
        availableExistingIds.add(id);
    }
}


class ThreadNewTask implements Runnable {
    private IdPool idPool;

    public ThreadNewTask(IdPool idPool) {
        this.idPool = idPool;
    }

    public void run() {
        Integer id = idPool.getExistingId();
        someMethod(id);
        idPool.releaseExistingId(id);
    }

// This method needs to be synchronized or not?
    private synchronized void someMethod(Integer id) {
        System.out.println("Task: " +id);
// and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        IdPool idPool = new IdPool();   
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while(System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination        
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    }
}

我的问题是:- 就性能而言,此代码是否正确?还有什么我可以在这里使它更准确?任何帮助将不胜感激。

4

4 回答 4

13

[首先,我很抱歉,这是对先前答案的回应,但我想要格式化]。

除非在现实中,当一个项目被提交到具有完整队列的 ThreadPoolExecutor 时,您不会阻塞。原因是 ThreadPoolExecutor 调用了 BlockingQueue.offer(T item) 方法,该方法定义为非阻塞方法。它要么添加项目并返回 true,要么不添加(满时)并返回 false。ThreadPoolExecutor 然后调用注册的 RejectedExecutionHandler 来处理这种情况。

从javadoc:

在未来的某个时间执行给定的任务。该任务可以在新线程或现有池线程中执行。如果任务无法提交执行,要么是因为这个执行器已经关闭,要么是因为它的容量已经达到,任务由当前的 RejectedExecutionHandler 处理。

默认情况下,使用 ThreadPoolExecutor.AbortPolicy() 从 ThreadPoolExecutor 的“提交”或“执行”方法抛出 RejectedExecutionException。

try {
   executorService.execute(new Runnable() { ... });
}
catch (RejectedExecutionException e) {
   // the queue is full, and you're using the AbortPolicy as the 
   // RejectedExecutionHandler
}

但是,您可以使用其他处理程序来做一些不同的事情,例如忽略错误(DiscardPolicy),或者在调用“执行”或“提交”方法的线程中运行它(CallerRunsPolicy)。此示例让调用“提交”或“执行”方法的线程在队列已满时运行请求的任务。(这意味着在任何给定时间,您都可以在池本身的内容之上运行 1 个额外的东西):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy());

如果你想阻塞并等待,你可以实现你自己的 RejectedExecutionHandler 它将阻塞直到队列上有一个可用的插槽(这是一个粗略的估计,我没有编译或运行它,但你应该明白):

public class BlockUntilAvailableSlot implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (e.isTerminated() || e.isShutdown()) {
        return;
     }

     boolean submitted = false;
     while (! submitted) {
       if (Thread.currentThread().isInterrupted()) {
            // be a good citizen and do something nice if we were interrupted
            // anywhere other than during the sleep method.
       }

       try {
          e.execute(r);
          submitted = true;
       }
       catch (RejectedExceptionException e) {
         try {
           // Sleep for a little bit, and try again.
           Thread.sleep(100L);
         }
         catch (InterruptedException e) {
           ; // do you care if someone called Thread.interrupt?
           // if so, do something nice here, and maybe just silently return.
         }
       }
     }
  }
}
于 2012-05-27T04:16:55.777 回答
2

It's creating an ExecutorService which handles the execution of a pool of threads. Both the initial and maximum number of threads in the pool is 10 in this case. When a thread in the pool becomes idle for 1 second (1000ms) it will kill it (the idle timer), however because the max and core number of threads is the same, this will never happen (it always keeps 10 threads around and will never run more than 10 threads).

It uses an ArrayBlockingQueue to manage the execution requests with 10 slots, so when the queue is full (after 10 threads have been enqueued), it will block the caller.

If thread is rejected (which in this case would be due to the service shutting down, since threads will be queued or you will be blocked when queuing a thread if the queue is full), then the offered Runnable will be executed on the caller's thread.

于 2012-05-26T23:15:07.850 回答
0

考虑信号量。这些都是为了相同的目的。请检查下面使用信号量的代码。不确定这是否是您想要的。但是,如果没有更多的许可证可以获取,这将被阻止。ID对您来说也很重要吗?

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class ThreadNewTask implements Runnable {
    private Semaphore idPool;

    public ThreadNewTask(Semaphore idPool) {
        this.idPool = idPool;
    }

    public void run() {
//      Integer id = idPool.getExistingId();
        try {
            idPool.acquire();
            someMethod(0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            idPool.release();
        }
//      idPool.releaseExistingId(id);
    }

    // This method needs to be synchronized or not?
    private void someMethod(Integer id) {
        System.out.println("Task: " + id);
        // and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        Semaphore idPool = new Semaphore(100); 
//      IdPool idPool = new IdPool();
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L,
                TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while (System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
    }
}
于 2012-05-28T11:27:53.367 回答
0

另一种解决方案是破解底层队列以替换offeroffer大超时(最长 292 年,可以认为是无限的)。


// helper method
private static boolean interruptibleInfiniteOffer(BlockingQueue<Runnable> q, Runnable r) {
    try {
        return q.offer(r, Long.MAX_VALUE, TimeUnit.NANOSECONDS); // infinite == ~292 years
    } catch (InterruptedException e) {
        return false;
    }
}

// fixed size pool with blocking (instead of rejecting) if bounded queue is full
public static ThreadPoolExecutor getFixedSizePoolWithLimitedWaitingQueue(int nThreads, int maxItemsInTheQueue) {
    BlockingQueue<Runnable> queue = maxItemsInTheQueue == 0
            ? new SynchronousQueue<>() { public boolean offer(Runnable r) { return interruptibleInfiniteOffer(this, r);} }
            : new ArrayBlockingQueue<>(maxItemsInTheQueue) { public boolean offer(Runnable r) { return interruptibleInfiniteOffer(this, r);} };
    return new ThreadPoolExecutor(nThreads, nThreads, 0, TimeUnit.MILLISECONDS, queue);
}
于 2021-03-07T14:37:21.193 回答