62

我最近发现了这个成语,我想知道我是否缺少一些东西。我从未见过它使用过。我在野外使用过的几乎所有 Java 代码都倾向于将数据放入字符串或缓冲区中,而不是像这个例子那样(例如使用 HttpClient 和 XML API):

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() {
            public void run() {
                output.setByteStream(source);
                serializer.write(doc, output);
                try {
                    source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }});

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

该代码使用 Unix 管道样式技术来防止将 XML 数据的多个副本保存在内存中。它使用 HTTP Post 输出流和 DOM 加载/保存 API 将 XML 文档序列化为 HTTP 请求的内容。据我所知,它通过很少的额外代码(仅用于RunnablePipedInputStream和的几行PipedOutputStream)最大限度地减少了内存的使用。

那么,这个成语有什么问题呢?如果这个成语没有错,为什么我没看到呢?

编辑:澄清PipedInputStreamPipedOutputStream替换随处可见的样板缓冲区副本,它们还允许您同时处理传入数据并写出处理后的数据。他们不使用操作系统管道。

4

9 回答 9

56

Javadocs

通常,数据由一个线程从 PipedInputStream 对象读取,数据由其他线程写入相应的 PipedOutputStream。不建议尝试从单个线程中使用这两个对象,因为它可能会使线程死锁。

这可以部分解释为什么它不被更常用。

我认为另一个原因是许多开发人员不了解它的目的/好处。

于 2009-01-27T17:06:17.367 回答
7

在您的示例中,您正在创建两个线程来完成可以由一个线程完成的工作。并在混合中引入 I/O 延迟。

你有更好的例子吗?还是我刚刚回答了你的问题。


将一些评论(至少我对它们的看法)拉到主要回复中:

  • 并发将复杂性引入应用程序。您现在必须关注独立数据流的排序,而不是处理单个线性数据流。在某些情况下,增加的复杂性可能是合理的,特别是如果您可以利用多个内核/CPU 来执行 CPU 密集型工作。
  • 如果您处于可以从并发操作中受益的情况,通常有更好的方法来协调线程之间的数据流。例如,使用并发队列在线程之间传递对象,而不是将管道流包装在对象流中。
  • 当您有多个线程执行文本处理时,管道流可能是一个很好的解决方案,例如 Unix 管道(例如:grep | sort)。

在特定示例中,管道流允许使用 HttpClient 提供的现有 RequestEntity 实现类。我认为更好的解决方案是创建一个新的实现类,如下所示,因为该示例最终是一个顺序操作,无法从并发实现的复杂性和开销中受益。虽然我将 RequestEntity 显示为匿名类,但可重用性表明它应该是一流的类。

post.setRequestEntity(new RequestEntity()
{
    public long getContentLength()
    {
        return 0-1;
    }

    public String getContentType()
    {
        return "text/xml";
    }

    public boolean isRepeatable()
    {
        return false;
    }

    public void writeRequest(OutputStream out) throws IOException
    {
        output.setByteStream(out);
        serializer.write(doc, output);
    }
});
于 2009-01-27T16:48:13.503 回答
7

我也是最近才发现 PipedInputStream/PipedOutputStream 类。

我正在开发一个需要通过 SSH 在远程服务器上执行命令的 Eclipse 插件。我正在使用JSch和 Channel API 从输入流读取并写入输出流。但我需要通过输入流提供命令并从输出流中读取响应。这就是 PipedInput/OutputStream 的用武之地。

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();
于 2009-01-27T17:48:44.923 回答
4

另外,回到最初的例子:不,它也不能完全减少内存使用。构建 DOM 树,完成内存缓冲——虽然这比全字节数组副本要好,但并没有那么好。但是在这种情况下缓冲会更慢;并且还创建了一个额外的线程——您不能在单个线程中使用 PipedInput/OutputStream 对。

有时 PipedXxxStreams 很有用,但它们没有被更多使用的原因是因为它们通常不是正确的解决方案。它们适用于线程间通信,这就是我使用它们的价值所在。只是考虑到 SOA 如何将大多数此类边界推到服务之间而不是线程之间,因此没有那么多用例。

于 2009-01-27T20:50:59.317 回答
3

这是管道有意义的用例:

假设您有一个第三方库,例如 xslt 映射器或加密库,其接口如下:doSomething(inputStream, outputStream)。而且您不想在通过线路发送之前缓冲结果。Apache 和其他客户端不允许直接访问线路输出流。您可以获得的最接近的是在请求实体对象中获取输出流 - 在偏移量处,在写入标头之后。但由于这是在幕后,将输入流和输出流传递给第三方库仍然不够。管道是解决这个问题的好方法。

顺便说一句,我写了一个 Apache 的 HTTP 客户端 API [PipedApacheClientOutputStream]的反转,它使用 Apache Commons HTTP Client 4.3.4 为 HTTP POST 提供了一个 OutputStream 接口。这是一个管道流可能有意义的示例。

于 2016-02-11T01:21:33.197 回答
2

我前一段时间尝试使用这些类,但我忘记了细节。但我确实发现他们的实施存在致命缺陷。我不记得它是什么,但我有一个偷偷摸摸的记忆,它可能是一个竞争条件,这意味着它们偶尔会死锁(是的,当然我在单独的线程中使用它们:它们根本不能用于单线程,并没有被设计成)。

我可能会看看他们的源代码,看看我是否能看到问题所在。

于 2009-01-27T20:27:45.947 回答
1

java.io 管道有太多的上下文切换(每字节读/写),它们的 java.nio 对应物要求你有一些 NIO 背景和正确使用通道和东西,这是我自己使用阻塞队列的管道实现单个生产者/消费者将执行快速且扩展良好:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
于 2013-10-19T12:28:00.357 回答
0

那么,这个成语有什么问题呢?如果这个成语没有错,为什么我没看到呢?

编辑:澄清一下,PipedInputStream 和 PipedOutputStream 替换了随处可见的样板缓冲区逐个缓冲区副本,它们还允许您在写出处理后的数据的同时处理传入数据。他们不使用操作系统管道。

您已经说明了它的作用,但没有说明您这样做的原因。

如果您认为这会减少使用的资源(cpu/内存)或提高性能,那么它也不会这样做。但是,它会使您的代码更加复杂。

基本上你有一个解决方案,没有它解决的问题。

于 2009-01-27T20:02:27.183 回答
0

PipedInputStream 和 PipeOutputStream 将在阻塞等待对方读取或写入满或空缓冲区时将其线程休眠1 秒。不使用。

于 2020-04-10T12:32:28.303 回答