虽然这是一个老问题,但我想发布我的答案,因为我最近正在寻找这个解决方案。可以在 ThreadPoolExecutor 后面使用 DelayQueue,它只需要一些代码包装。诀窍是让 DelayQueue 将自己呈现为 BlockingQueue。
我首先定义了一个DR
同时扩展Runnable
和的接口Delayed
。请注意,此处的静态方法会创建 DR 的实例(未显示实例类)。
public interface DR extends Delayed, Runnable {
public static DR make( Runnable r )
{
if (r instanceof DR)
{
return (DR)r;
}
Impl impl = new Impl(r);
if (r instanceof Delayed)
{
impl.expires = ((Delayed) r).getDelay( TimeUnit.MILLISECONDS );
}
return impl;
}
public static DR make( Runnable r, long expires )
{
if (r instanceof DR)
{
if (expires == ((DR)r).getDelay( TimeUnit.MILLISECONDS ))
{
return (DR)r;
}
}
return new Impl(r, expires);
}
}
实现应覆盖:public int compareTo(Delayed o)
、、public boolean equals( Object o )
和public int hashCode()
。
创建一个扩展 DelayQueue 的类。此类添加了一个将延迟队列呈现为 BlockingQueue 的方法。返回的类简单地包装了 DelayQueue 并使用接口的make
方法在需要的地方DR
进行转换。Runnable
DR
public class DelayedBlockingQueue extends DelayQueue<DR> {
public BlockingQueue<Runnable> asRunnableQueue() {
return new BlockingQueue<Runnable>(){
DelayedBlockingQueue dbq = DelayedBlockingQueue.this;
public boolean add(Runnable e) {
return dbq.add( DR.make( e ));
}
private List<DR> makeList( Collection<? extends Runnable> coll)
{
return coll.stream().map( r -> DR.make( r ) ).collect( Collectors.toList() ) ;
}
public boolean addAll(Collection<? extends Runnable> arg0) {
return dbq.addAll(makeList( arg0 ) );
}
public void clear() {
dbq.clear();
}
public boolean contains(Object o) {
if (o instanceof Runnable) {
return dbq.contains( DR.make( (Runnable)o ) );
}
return false;
}
public boolean containsAll(Collection<?> arg0) {
List<DR> lst = new ArrayList<DR>();
for (Object o : arg0)
{
if (o instanceof Runnable)
{
lst.add( DR.make( (Runnable)o ) );
}
else {
return false;
}
}
return dbq.containsAll( lst );
}
public int drainTo(Collection<? super Runnable> c, int maxElements) {
return dbq.drainTo( c, maxElements );
}
public int drainTo(Collection<? super Runnable> c) {
return dbq.drainTo( c );
}
public Runnable element() {
return dbq.element();
}
public void forEach(Consumer<? super Runnable> arg0) {
dbq.forEach( arg0 );
}
public boolean isEmpty() {
return dbq.isEmpty();
}
public Iterator<Runnable> iterator() {
return WrappedIterator.create( dbq.iterator() ).mapWith( dr -> (Runnable)dr );
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
return dbq.offer( DR.make( e ), timeout, unit );
}
public boolean offer(Runnable e) {
return dbq.offer( DR.make( e ) );
}
public Runnable peek() {
return dbq.peek();
}
public Runnable poll() {
return dbq.poll();
}
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
return dbq.poll( timeout, unit );
}
public void put(Runnable e) throws InterruptedException {
dbq.put( DR.make(e) );
}
public int remainingCapacity() {
return dbq.remainingCapacity();
}
public Runnable remove() {
return dbq.remove();
}
public boolean remove(Object o) {
if (o instanceof Runnable)
{
return dbq.remove( DR.make( (Runnable)o) );
}
return false;
}
public boolean removeAll(Collection<?> arg0) {
List<DR> lst = new ArrayList<DR>();
for (Object o : arg0)
{
if (o instanceof Runnable)
{
lst.add( DR.make( (Runnable)o ) );
}
}
return dbq.removeAll( lst );
}
public boolean retainAll(Collection<?> arg0) {
return dbq.retainAll( arg0 );
}
public int size() {
return dbq.size();
}
public Runnable take() throws InterruptedException {
return dbq.take();
}
public Object[] toArray() {
return dbq.toArray();
}
public <T> T[] toArray(T[] arg0) {
return dbq.toArray( arg0 );
}
};
}
要使用该解决方案,请创建DelayedBlockingQueue
并使用该asRunnableQueue()
方法将可运行队列传递给 ThreadPoolExecutor 构造函数。
DelayedBlockingQueue queue = new DelayedBlockingQueue();
ThreadPoolExecutor execService = new ThreadPoolExecutor( 1, 5, 30, TimeUnit.SECONDS, queue.asRunnableQueue() );