自定义阻塞队列
package com.gunjan;
import java.util.concurrent.BlockingQueue;
public abstract class CustomBlockingQueue<E> implements BlockingQueue<E> {
public BlockingQueue<E> blockingQueue;
public CustomBlockingQueue(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
final public boolean offer(E e) {
return false;
}
final public boolean customOffer(E e) {
return blockingQueue.offer(e);
}
}
线程池阻塞队列
package com.gunjan;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class ThreadPoolBlockingQueue<E> extends CustomBlockingQueue<E> {
public ThreadPoolBlockingQueue(BlockingQueue blockingQueue) {
super(blockingQueue);
}
@Override
public E remove() {
return this.blockingQueue.remove();
}
@Override
public E poll() {
return this.blockingQueue.poll();
}
@Override
public E element() {
return this.blockingQueue.element();
}
@Override
public E peek() {
return this.blockingQueue.peek();
}
@Override
public int size() {
return this.blockingQueue.size();
}
@Override
public boolean isEmpty() {
return this.blockingQueue.isEmpty();
}
@Override
public Iterator<E> iterator() {
return this.blockingQueue.iterator();
}
@Override
public Object[] toArray() {
return this.blockingQueue.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return this.blockingQueue.toArray(a);
}
@Override
public boolean containsAll(Collection<?> c) {
return this.blockingQueue.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
return this.blockingQueue.addAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
return this.blockingQueue.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
return this.blockingQueue.retainAll(c);
}
@Override
public void clear() {
this.blockingQueue.clear();
}
@Override
public boolean add(E e) {
return this.blockingQueue.add(e);
}
@Override
public void put(E e) throws InterruptedException {
this.blockingQueue.put(e);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return this.blockingQueue.offer(e, timeout, unit);
}
@Override
public E take() throws InterruptedException {
return this.blockingQueue.take();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return this.blockingQueue.poll(timeout, unit);
}
@Override
public int remainingCapacity() {
return this.blockingQueue.remainingCapacity();
}
@Override
public boolean remove(Object o) {
return this.blockingQueue.remove(o);
}
@Override
public boolean contains(Object o) {
return this.blockingQueue.contains(o);
}
@Override
public int drainTo(Collection<? super E> c) {
return this.blockingQueue.drainTo(c);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return this.blockingQueue.drainTo(c, maxElements);
}
}
RejectedExecutionHandlerImpl
package com.gunjan;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
boolean inserted = ((CustomBlockingQueue) executor.getQueue()).customOffer(r);
if (!inserted) {
throw new RejectedExecutionException();
}
}
}
CustomThreadPoolExecutorTest
package com.gunjan;
import java.util.concurrent.*;
public class CustomThreadPoolExecutorTest {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<Runnable>(500);
CustomBlockingQueue customLinkedBlockingQueue = new ThreadPoolBlockingQueue<Runnable>(linkedBlockingQueue);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 100, 60, TimeUnit.SECONDS,
customLinkedBlockingQueue, new RejectedExecutionHandlerImpl());
for (int i = 0; i < 750; i++) {
try {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(threadPoolExecutor);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
System.out.println(threadPoolExecutor);
}
}