1

我有一个相当标准的生产者和消费者线程:

  • 生产者从文件读取字节并将其解码到阻塞队列。
  • 消费者正在从队列中轮询项目

发生解码过程是一个瓶颈,并且可能会从拥有更多 CPU 中受益。它是生产者时间的 70%。如果我引入“解码器”线程,我会获得任何显着的性能吗?

  • 生产者从文件中读取字节到阻塞的“对象”队列
  • 解码器将字节对象解码为项目
  • 消费者正在轮询“解码”项目

由于内存占用,我需要使用一个队列 - 不能拥有两个队列(字节/项目),所以我猜会出现对象“转换”开销?

关于如何实现这个 3 线程解决方案的任何想法?

谢谢!

4

2 回答 2

0

您应该为生产者和消费者调整线程池 - 例如,如果消费者相对于生产者来说太快了,那么它的线程池分配的线程可能比生产者的线程池少。这应该会导致吞吐量的显着增加。应调整生产者与消费者线程的比例(示例 3:1)。

在类似的行中,您可以拥有三个线程池,其中生产者(读取器)和消费者的线程数较少,而解码器(转换器)线程池的线程数较多。我不确定您是否需要代码示例,在这种情况下您应该分享您当前拥有的内容。我将从生产者和消费者的大小为 1 的线程池和转换器(解码器)的大小为 5 的线程池开始,然后测量瓶颈是什么(如果吞吐量符合您的期望)

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ProducerDecoderConsumer {
    /**
     * @param args
     */
    public static void main(String[] args) {
        BlockingQueue<Integer> inputQueue = new PriorityBlockingQueue<Integer>();
        BlockingQueue<String> outputQueue = new PriorityBlockingQueue<String>();
        ExecutorService reader = Executors.newSingleThreadExecutor();
        reader.submit(new Producer(inputQueue));
        ExecutorService decoder = Executors.newFixedThreadPool(5);
        decoder.submit(new Transformer(inputQueue, outputQueue));
        ExecutorService writer = Executors.newSingleThreadExecutor();
        writer.submit(new Consumer(outputQueue));

    }

    private static class Producer implements Callable<Void> {
        final BlockingQueue<Integer> queue;

        public Producer(final BlockingQueue<Integer> pQueue) {
            queue = pQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                Random random = new Random();
                while (true) {
                    queue.put(random.nextInt());
                }
            } catch (Exception e) {

            }
            return null;
        }
    }

    private static class Transformer implements Callable<Void> {
        final BlockingQueue<Integer> inputQueue;

        final BlockingQueue<String> outputQueue;

        public Transformer(final BlockingQueue<Integer> pInputQueue, final BlockingQueue<String> pOutputQueue) {
            inputQueue = pInputQueue;
            outputQueue = pOutputQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                while (true) {
                    Integer input = inputQueue.take();
                    String output = String.valueOf(input); // decode input to output
                    outputQueue.put(output); // output
                }
            } catch (Exception e) {

            }
            return null;
        }
    }

    private static class Consumer implements Callable<Void> {
        final BlockingQueue<String> queue;

        public Consumer(final BlockingQueue<String> pQueue) {
            queue = pQueue;
        }

        @Override
        public Void call() throws Exception {
            try {
                while (true) {
                    System.out.println(queue.take());
                }
            } catch (Exception e) {

            }
            return null;
        }
    }
}

我添加了一些代码来说明这个想法 - 我使用两个阻塞队列,这与您的问题中提到的单个队列不同,因为我认为只有额外的队列不会有开销 - 我建议使用分析器证明这样的事情。但是,我希望您发现它很有用,并且如果您真的需要,可以将其改装为单队列模型。

于 2013-06-11T07:04:45.990 回答
0

2 个队列,一个用于保存多个消费者从中解码的未解码对象。

多个消费者将解码并将解码的对象写入第二个队列,最终消费者将从该队列中消费。

确保避免死锁(除非你真的知道自己在做什么,否则notifyAll()不要使用)notify()

于 2013-06-18T05:56:31.560 回答