我正在尝试在我的应用程序中实现延迟队列。
这是将项目放入队列的延迟生产者。
package com.pra.delayed.queue;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class DelayedProducer {
private static Queue<Delayed> queue;
public DelayedProducer(Queue<Delayed> passedQueue) {
queue = passedQueue;
}
public void putDelayedObj(String xmlMessage, long time) throws InterruptedException{
queue.offer(new DelayedDataObject(xmlMessage, time));
}
public static void main(String[] args) {
Queue<Delayed> localQueue = new DelayQueue<Delayed>();
DelayedProducer delayedProducer = new DelayedProducer(localQueue);
try {
int i=0;
ExecutorService es = Executors.newFixedThreadPool(500);
for(int iTh=0; iTh<300; iTh++){
Runnable consumer = new DelayedConsumer(localQueue);
es.execute(consumer);
}
Thread.sleep(10*1000);
long ttt = System.currentTimeMillis();
while(i<=100000){
delayedProducer.putDelayedObj(i+"", ttt);
i++;
}
es.shutdown();
es.awaitTermination(10,TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这是一个 Runnable 对象的消费者。
package com.pra.delayed.queue;
import java.util.Queue;
import java.util.concurrent.Delayed;
public class DelayedConsumer implements Runnable{
private static Queue<Delayed> queue;
public DelayedConsumer(Queue<Delayed> passedQueue) {
queue = passedQueue;
}
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
DelayedDataObject ddo = (DelayedDataObject)queue.poll();
if(ddo != null){
System.out.println(queue.size());
}
}
}
我放入了 100000 个对象,但只得到了 300 个对象。
请帮助我更正实施,并解释我在代码中的错误。