您应该为生产者和消费者调整线程池 - 例如,如果消费者相对于生产者来说太快了,那么它的线程池分配的线程可能比生产者的线程池少。这应该会导致吞吐量的显着增加。应调整生产者与消费者线程的比例(示例 3:1)。
在类似的行中,您可以拥有三个线程池,其中生产者(读取器)和消费者的线程数较少,而解码器(转换器)线程池的线程数较多。我不确定您是否需要代码示例,在这种情况下您应该分享您当前拥有的内容。我将从生产者和消费者的大小为 1 的线程池和转换器(解码器)的大小为 5 的线程池开始,然后测量瓶颈是什么(如果吞吐量符合您的期望)
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerDecoderConsumer {
/**
* @param args
*/
public static void main(String[] args) {
BlockingQueue<Integer> inputQueue = new PriorityBlockingQueue<Integer>();
BlockingQueue<String> outputQueue = new PriorityBlockingQueue<String>();
ExecutorService reader = Executors.newSingleThreadExecutor();
reader.submit(new Producer(inputQueue));
ExecutorService decoder = Executors.newFixedThreadPool(5);
decoder.submit(new Transformer(inputQueue, outputQueue));
ExecutorService writer = Executors.newSingleThreadExecutor();
writer.submit(new Consumer(outputQueue));
}
private static class Producer implements Callable<Void> {
final BlockingQueue<Integer> queue;
public Producer(final BlockingQueue<Integer> pQueue) {
queue = pQueue;
}
@Override
public Void call() throws Exception {
try {
Random random = new Random();
while (true) {
queue.put(random.nextInt());
}
} catch (Exception e) {
}
return null;
}
}
private static class Transformer implements Callable<Void> {
final BlockingQueue<Integer> inputQueue;
final BlockingQueue<String> outputQueue;
public Transformer(final BlockingQueue<Integer> pInputQueue, final BlockingQueue<String> pOutputQueue) {
inputQueue = pInputQueue;
outputQueue = pOutputQueue;
}
@Override
public Void call() throws Exception {
try {
while (true) {
Integer input = inputQueue.take();
String output = String.valueOf(input); // decode input to output
outputQueue.put(output); // output
}
} catch (Exception e) {
}
return null;
}
}
private static class Consumer implements Callable<Void> {
final BlockingQueue<String> queue;
public Consumer(final BlockingQueue<String> pQueue) {
queue = pQueue;
}
@Override
public Void call() throws Exception {
try {
while (true) {
System.out.println(queue.take());
}
} catch (Exception e) {
}
return null;
}
}
}
我添加了一些代码来说明这个想法 - 我使用两个阻塞队列,这与您的问题中提到的单个队列不同,因为我认为只有额外的队列不会有开销 - 我建议使用分析器证明这样的事情。但是,我希望您发现它很有用,并且如果您真的需要,可以将其改装为单队列模型。