我有一个系统,可以从列表中读取名称,调用外部服务器进行真/假状态检查,并对具有真实状态的人采取行动。对外部服务器的调用需要一些时间,因此在一个线程中运行它并不是很有效。
我目前正在尝试将其实现为生产者/消费者系统,其中许多消费者线程从列表中读取名称,调用外部服务器,将有效名称放入阻塞队列中,并让单个消费者从队列中挑选项目并采取行动. 遗憾的是,系统有时会运行完成,有时会无限期挂起。测试代码如下
public class SubscriberTest {
static Queue<String> subscribed = new ConcurrentLinkedQueue<String>();
static BlockingQueue<String> valid = new LinkedBlockingQueue<String>(100);
Random rand = new Random();
public SubscriberTest(int i) {
for (int j = 0; j < i; j++) {
subscribed.add("I love:" + j);
}
}
public SubscriberTest(Queue<String> subs) {
subscribed = subs;
}
public static void main(String[] args) {
SubscriberTest fun = new SubscriberTest(10000);
System.out.println(subscribed.size());
ExecutorService producers = Executors.newCachedThreadPool();
ExecutorService consumers = Executors.newSingleThreadExecutor();
Consumer consumer = fun.new Consumer();
Producer producer = fun.new Producer();
while (!subscribed.isEmpty()) {
producers.execute(producer);
consumers.execute(consumer);
}
producers.shutdown();
consumers.shutdown();
System.out.println("finally");
}
// take names from subscribed and get status
class Producer implements Runnable {
public void run() {
String x = subscribed.poll();
System.out.println("Producer: " + x + " " + Thread.currentThread().getName());
try {
if (getStatus(x)) {
valid.put(x);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// this is a call to an external server
private boolean getStatus(String x) {
return rand.nextBoolean();
}
}
// takes names from valid queue and save them
class Consumer implements Runnable {
public void run() {
try {
System.out.println("Consumer: " + valid.take() + " " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
请告诉我哪里出错了。