1

我有一个系统,可以从列表中读取名称,调用外部服务器进行真/假状态检查,并对具有真实状态的人采取行动。对外部服务器的调用需要一些时间,因此在一个线程中运行它并不是很有效。

我目前正在尝试将其实现为生产者/消费者系统,其中许多消费者线程从列表中读取名称,调用外部服务器,将有效名称放入阻塞队列中,并让单个消费者从队列中挑选项目并采取行动. 遗憾的是,系统有时会运行完成,有时会无限期挂起。测试代码如下

public class SubscriberTest {
    static Queue<String> subscribed = new ConcurrentLinkedQueue<String>();
    static BlockingQueue<String> valid = new LinkedBlockingQueue<String>(100);
    Random rand = new Random();

    public SubscriberTest(int i) {
        for (int j = 0; j < i; j++) {
            subscribed.add("I love:" + j);
        }
    }

    public SubscriberTest(Queue<String> subs) {
        subscribed = subs;
    }

    public static void main(String[] args) {
        SubscriberTest fun = new SubscriberTest(10000);
        System.out.println(subscribed.size());
        ExecutorService producers = Executors.newCachedThreadPool();
        ExecutorService consumers = Executors.newSingleThreadExecutor();
        Consumer consumer = fun.new Consumer();
        Producer producer = fun.new Producer();
        while (!subscribed.isEmpty()) {
            producers.execute(producer);
            consumers.execute(consumer);
        }
        producers.shutdown();
        consumers.shutdown();
        System.out.println("finally");
    }

    // take names from subscribed and get status
    class Producer implements Runnable {
        public void run() {

            String x = subscribed.poll();
            System.out.println("Producer: " + x + " " + Thread.currentThread().getName());
            try {
                if (getStatus(x)) {
                    valid.put(x);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // this is a call to an external server
        private boolean getStatus(String x) {
            return rand.nextBoolean();
        }
    }

    // takes names from valid queue and save them
    class Consumer implements Runnable {

        public void run() {
            try {
                System.out.println("Consumer: " + valid.take() + " " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

请告诉我哪里出错了。

4

2 回答 2

4
String x = subscribed.poll();

如果队列中没有可用的内容,将返回null,这意味着您将尝试将 a 放入null“有效”队列,这将导致空指针异常,并且该特定线程将退出。当池中的所有线程都发生这种情况时,应用程序将挂起。

于 2011-05-02T19:31:12.160 回答
2

ExecutorService 是具有任务队列的线程池。添加另一个队列只会增加复杂性并增加您做错事的机会。我建议您只使用那里已经存在的队列。

public class SubscriberTest {
    public static void main(String[] args) throws InterruptedException {
        final ExecutorService consumers = Executors.newSingleThreadExecutor();
        // middle producer
    final ExecutorService producers = Executors.newFixedThreadPool(
                                   Runtime.getRuntime().availableProcessors());
    // subscribed/original producer.
    for (int i = 0; i < 1000*1000; i++) {
            final String task = "I love:" + i;
            producers.execute(new MidProducer(task, consumers));
        }

        producers.shutdown();
        producers.awaitTermination(10, TimeUnit.SECONDS);
        consumers.shutdown();
        System.out.println("finally");
    }


    static class MidProducer implements Runnable {
        private final Random rand = new Random();
        private final String task;
        private final ExecutorService consumers;

        public MidProducer(String task, ExecutorService consumers) {
            this.task = task;
            this.consumers = consumers;
        }

        public void run() {
            System.out.println("Producer: " + task + " " + Thread.currentThread().getName());

            if (getStatus(task))
                consumers.execute(new Consumer(task));
        }

        private boolean getStatus(String x) {
            return rand.nextBoolean();
        }

    }

    static class Consumer implements Runnable {
        private final String task;

        private Consumer(String task) {
            this.task = task;
        }

        public void run() {
            System.out.println("Consumer: " + task + " " + Thread.currentThread().getName());
        }
    }
}

印刷

Producer: I love: 1 pool-2-thread-2
Producer: I love: 3 pool-2-thread-4
Producer: I love: 2 pool-2-thread-3
Producer: I love: 5 pool-2-thread-2
Producer: I love: 7 pool-2-thread-2
Producer: I love: 4 pool-2-thread-5
Producer: I love: 6 pool-2-thread-6
Producer: I love: 8 pool-2-thread-7
Producer: I love: 10 pool-2-thread-2
Producer: I love: 9 pool-2-thread-5
Producer: I love: 11 pool-2-thread-8
Producer: I love: 12 pool-2-thread-9
Producer: I love: 14 pool-2-thread-10
Producer: I love: 13 pool-2-thread-2
Producer: I love: 16 pool-2-thread-10
Producer: I love: 15 pool-2-thread-11
Producer: I love: 17 pool-2-thread-12
Producer: I love: 20 pool-2-thread-14
Producer: I love: 19 pool-2-thread-10
Producer: I love: 18 pool-2-thread-13
Producer: I love: 0 pool-2-thread-1
Producer: I love: 22 pool-2-thread-12
Producer: I love: 21 pool-2-thread-15
Producer: I love: 25 pool-2-thread-3
Producer: I love: 27 pool-2-thread-12
Producer: I love: 26 pool-2-thread-10
Producer: I love: 24 pool-2-thread-15
Producer: I love: 28 pool-2-thread-1
Producer: I love: 23 pool-2-thread-16
Producer: I love: 31 pool-2-thread-11
Producer: I love: 30 pool-2-thread-16
Producer: I love: 32 pool-2-thread-1
Producer: I love: 36 pool-2-thread-3
Consumer: I love: 2 pool-1-thread-1

...

Consumer: I love: 9975 pool-1-thread-1
Consumer: I love: 9977 pool-1-thread-1
Consumer: I love: 9978 pool-1-thread-1
Consumer: I love: 9979 pool-1-thread-1
Consumer: I love: 9981 pool-1-thread-1
Producer: I love: 9996 pool-2-thread-16
Consumer: I love: 9984 pool-1-thread-1
Consumer: I love: 9985 pool-1-thread-1
Consumer: I love: 9990 pool-1-thread-1
Consumer: I love: 9992 pool-1-thread-1
Producer: I love: 9997 pool-2-thread-16
Consumer: I love: 9994 pool-1-thread-1
Consumer: I love: 9995 pool-1-thread-1
Consumer: I love: 9996 pool-1-thread-1
Producer: I love: 9998 pool-2-thread-16
Producer: I love: 9999 pool-2-thread-16
Consumer: I love: 9997 pool-1-thread-1
Consumer: I love: 9998 pool-1-thread-1
Consumer: I love: 9999 pool-1-thread-1
finally
于 2011-05-02T19:56:51.467 回答