6

我正在编写一个应用程序,其中涉及将相当大的数据块写入 OutputStream(属于 Socket)。使这有点复杂的是,通常有多个线程试图写入同一个 OutputStream。目前,我对其进行了设计,以便将数据写入其中的 OutputStream 位于其自己的线程中。该线程包含一个队列(LinkedList),它轮询字节数组并尽快将它们写入。

private class OutputStreamWriter implements Runnable {

    private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>();

    public void run() {
        OutputStream outputStream = User.this.outputStream;
        while (true) {
            try {
                if (chunkQueue.isEmpty()) {
                    Thread.sleep(100);
                    continue;
                }
                outputStream.write(chunkQueue.poll());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

这种设计的问题在于,随着越来越多的写入发生,越来越多的数据排队,并且写入速度没有任何提高。最初,当数据被放入队列时,它实际上是立即写入的。然后大约15秒左右后,数据开始滞后;从数据排队到数据实际写入的时间会有延迟。随着时间的推移,这种延迟变得越来越长。这是非常明显的。

解决此问题的一种方法是某种 ConcurrentOutputStream 实现,该实现允许在不阻塞的情况下发送数据,这样就不会开始备份写入(哎呀,那时就不需要队列了)。我不知道是否有这样的实现——我一直找不到——而且我个人认为甚至不可能写一个。

那么,有人对我如何重新设计它有任何建议吗?

4

3 回答 3

4

套接字的吞吐量是有限的;如果它比您的数据生成吞吐量慢,则必须缓冲数据,这是没有办法的。“同时”写作根本没有帮助。

您可以考虑在排队数据超过一定限制时暂停数据生成,以减少内存消耗。

于 2012-11-13T05:27:15.430 回答
0

我需要一个过滤器来拦截需要尽快关闭数据库连接的慢速连接,所以我最初使用 Java 管道,但是当仔细观察它们的实现时,它都是同步的,所以我最终使用一个小缓冲区和阻塞队列创建了自己的 QueueInputStream将缓冲区放入队列中一旦已满,它是无锁的,除非 LinkedBlockingQueue 使用的锁定条件在小缓冲区的帮助下应该很便宜,此类仅用于单个生产者和消费者每个实例,您应该传递一个 ExecutorService 以开始将排队的字节流式传输到最终的 OutputStream:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
于 2013-10-18T17:50:10.457 回答
0

我同意@irreputable 的观点,即并发写作丝毫没有帮助。相反,您应该关注生产方面,即您已经拥有的东西。

  1. 使用 BlockingQueue 而不是 LinkedList。

  2. 使用队列的阻塞轮询操作,而不是仅仅盲目睡眠 100msl,根据定义,这将平均浪费 50% 的时间。在很长一段时间内,这真的可以加起来。

于 2012-11-13T07:46:40.520 回答