383

我正处于开发阶段,我有两个模块,我从一个模块中得到输出OutputStream,第二个模块只接受InputStream. 你知道如何转换OutputStreamInputStream(反之亦然,我的意思是真的这样)我将能够连接这两个部分吗?

谢谢

4

12 回答 12

248

似乎有很多链接和其他类似的东西,但没有使用管道的实际代码。使用java.io.PipedInputStreamand的好处java.io.PipedOutputStream是没有额外的内存消耗。ByteArrayOutputStream.toByteArray()返回原始缓冲区的副本,这意味着无论您在内存中拥有什么,现在都有两个副本。然后写入一个InputStream意味着您现在拥有三个数据副本。

编码:

// take the copy of the stream and re-write it to an InputStream
PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
new Thread(new Runnable() {
    public void run () {
        try {
            // write the original OutputStream to the PipedOutputStream
            // note that in order for the below method to work, you need
            // to ensure that the data has finished writing to the
            // ByteArrayOutputStream
            originalByteArrayOutputStream.writeTo(out);
        }
        catch (IOException e) {
            // logging and exception handling should go here
        }
        finally {
            // close the PipedOutputStream here because we're done writing data
            // once this thread has completed its run
            if (out != null) {
                // close the PipedOutputStream cleanly
                out.close();
            }
        }   
    }
}).start();

此代码假定originalByteArrayOutputStream是 aByteArrayOutputStream因为它通常是唯一可用的输出流,除非您正在写入文件。我希望这有帮助!这样做的好处是,因为它在一个单独的线程中,所以它也是并行工作的,所以任何消耗你的输入流的东西也会从你的旧输出流中流出。这是有益的,因为缓冲区可以保持较小,您将有更少的延迟和更少的内存使用。

于 2014-05-26T16:20:41.210 回答
120

AnOutputStream是您向其中写入数据的地方。如果某个模块公开了一个OutputStream,则期望在另一端有一些读取。

InputStream另一方面,暴露 的东西表明您将需要收听此流,并且将有您可以读取的数据。

所以可以将一个连接InputStream到一个OutputStream

InputStream----read---> intermediateBytes[n] ----write----> OutputStream

正如有人提到的,这就是IOUtilscopy()中的方法可以让你做的事情。走另一条路是没有意义的……希望这是有道理的

更新:

当然,我想得越多,我就越能看到这实际上是一个要求。我知道一些评论提到了Piped输入/输出流,但还有另一种可能性。

如果公开的输出流是 a ByteArrayOutputStream,那么您始终可以通过调用该toByteArray()方法来获取完整内容。ByteArrayInputStream然后,您可以使用子类创建输入流包装器。这两个是伪流,它们基本上都只是包装一个字节数组。因此,以这种方式使用流在技术上是可行的,但对我来说仍然很奇怪......

于 2011-04-25T13:36:51.697 回答
47

As input and output streams are just start and end point, the solution is to temporary store data in byte array. So you must create intermediate ByteArrayOutputStream, from which you create byte[] that is used as input for new ByteArrayInputStream.

public void doTwoThingsWithStream(InputStream inStream, OutputStream outStream){ 
  //create temporary bayte array output stream
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  doFirstThing(inStream, baos);
  //create input stream from baos
  InputStream isFromFirstData = new ByteArrayInputStream(baos.toByteArray()); 
  doSecondThing(isFromFirstData, outStream);
}

Hope it helps.

于 2014-05-08T08:35:55.617 回答
26
ByteArrayOutputStream buffer = (ByteArrayOutputStream) aOutputStream;
byte[] bytes = buffer.toByteArray();
InputStream inputStream = new ByteArrayInputStream(bytes);
于 2017-01-27T07:13:47.753 回答
20

您将需要一个中间类来缓冲。每次InputStream.read(byte[]...)调用时,缓冲类将用从传入的下一个块填充传入的字节数组OutputStream.write(byte[]...)。由于块的大小可能不同,适配器类将需要存储一定量,直到它有足够的空间来填充读取缓冲区和/或能够存储任何缓冲区溢出。

本文对解决此问题的几种不同方法进行了很好的细分:

http://blog.ostermiller.org/convert-java-outputstream-inputstream

于 2012-05-26T18:53:00.457 回答
18

easystream开源库直接支持将 OutputStream 转换为 InputStream: http : //io-tools.sourceforge.net/easystream/tutorial/tutorial.html

// create conversion
final OutputStreamToInputStream<Void> out = new OutputStreamToInputStream<Void>() {
    @Override
    protected Void doRead(final InputStream in) throws Exception {
           LibraryClass2.processDataFromInputStream(in);
           return null;
        }
    };
try {   
     LibraryClass1.writeDataToTheOutputStream(out);
} finally {
     // don't miss the close (or a thread would not terminate correctly).
     out.close();
}

他们还列出了其他选项:http: //io-tools.sourceforge.net/easystream/outputstream_to_inputstream/implementations.html

  • 将数据写入内存缓冲区 (ByteArrayOutputStream) 获取 byteArray 并使用 ByteArrayInputStream 再次读取。如果您确定您的数据适合内存,这是最好的方法。
  • 将您的数据复制到临时文件并读回。
  • 使用管道:这是内存使用和速度的最佳方法(您可以充分利用多核处理器),也是 Sun 提供的标准解决方案。
  • 使用 easystream 库中的 InputStreamFromOutputStream 和 OutputStreamToInputStream。
于 2013-06-06T23:19:15.733 回答
13

ByteArrayOutputStream我在将 a 转换为 a时遇到了同样的问题,ByteArrayInputStream并通过使用派生类解决了这个问题,该派生类ByteArrayOutputStream能够返回ByteArrayInputStream使用ByteArrayOutputStream. 这种方式不使用额外的内存,并且“转换”非常快:

package info.whitebyte.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
 * This class extends the ByteArrayOutputStream by 
 * providing a method that returns a new ByteArrayInputStream
 * which uses the internal byte array buffer. This buffer
 * is not copied, so no additional memory is used. After
 * creating the ByteArrayInputStream the instance of the
 * ByteArrayInOutStream can not be used anymore.
 * <p>
 * The ByteArrayInputStream can be retrieved using <code>getInputStream()</code>.
 * @author Nick Russler
 */
public class ByteArrayInOutStream extends ByteArrayOutputStream {
    /**
     * Creates a new ByteArrayInOutStream. The buffer capacity is
     * initially 32 bytes, though its size increases if necessary.
     */
    public ByteArrayInOutStream() {
        super();
    }

    /**
     * Creates a new ByteArrayInOutStream, with a buffer capacity of
     * the specified size, in bytes.
     *
     * @param   size   the initial size.
     * @exception  IllegalArgumentException if size is negative.
     */
    public ByteArrayInOutStream(int size) {
        super(size);
    }

    /**
     * Creates a new ByteArrayInputStream that uses the internal byte array buffer 
     * of this ByteArrayInOutStream instance as its buffer array. The initial value 
     * of pos is set to zero and the initial value of count is the number of bytes 
     * that can be read from the byte array. The buffer array is not copied. This 
     * instance of ByteArrayInOutStream can not be used anymore after calling this
     * method.
     * @return the ByteArrayInputStream instance
     */
    public ByteArrayInputStream getInputStream() {
        // create new ByteArrayInputStream that respects the current count
        ByteArrayInputStream in = new ByteArrayInputStream(this.buf, 0, this.count);

        // set the buffer of the ByteArrayOutputStream 
        // to null so it can't be altered anymore
        this.buf = null;

        return in;
    }
}

我把东西放在github上:https ://github.com/nickrussler/ByteArrayInOutStream

于 2014-09-04T19:57:50.430 回答
2

io-extras可能很有用。例如,如果您想使用 gzipInputStreamGZIPOutputStream希望它同步发生(使用默认缓冲区大小 8192):

InputStream is = ...
InputStream gz = IOUtil.pipe(is, o -> new GZIPOutputStream(o));

请注意,该库具有 100% 的单元测试覆盖率(当然值得!)并且位于 Maven Central 上。Maven依赖是:

<dependency>
  <groupId>com.github.davidmoten</groupId>
  <artifactId>io-extras</artifactId>
  <version>0.1</version>
</dependency>

请务必检查更高版本。

于 2018-04-26T00:43:02.613 回答
0

从我的角度来看,java.io.PipedInputStream/java.io.PipedOutputStream 是考虑的最佳选择。在某些情况下,您可能想要使用 ByteArrayInputStream/ByteArrayOutputStream。问题是您需要复制缓冲区才能将 ByteArrayOutputStream 转换为 ByteArrayInputStream。ByteArrayOutpuStream/ByteArrayInputStream 也限制为 2GB。这是我为绕过 ByteArrayOutputStream/ByteArrayInputStream 限制而编写的 OutpuStream/InputStream 实现(Scala 代码,但对于 java 开发人员来说很容易理解):

import java.io.{IOException, InputStream, OutputStream}

import scala.annotation.tailrec

/** Acts as a replacement for ByteArrayOutputStream
  *
  */
class HugeMemoryOutputStream(capacity: Long) extends OutputStream {
  private val PAGE_SIZE: Int = 1024000
  private val ALLOC_STEP: Int = 1024

  /** Pages array
    *
    */
  private var streamBuffers: Array[Array[Byte]] = Array.empty[Array[Byte]]

  /** Allocated pages count
    *
    */
  private var pageCount: Int = 0

  /** Allocated bytes count
    *
    */
  private var allocatedBytes: Long = 0

  /** Current position in stream
    *
    */
  private var position: Long = 0

  /** Stream length
    *
    */
  private var length: Long = 0

  allocSpaceIfNeeded(capacity)

  /** Gets page count based on given length
    *
    * @param length   Buffer length
    * @return         Page count to hold the specified amount of data
    */
  private def getPageCount(length: Long) = {
    var pageCount = (length / PAGE_SIZE).toInt + 1

    if ((length % PAGE_SIZE) == 0) {
      pageCount -= 1
    }

    pageCount
  }

  /** Extends pages array
    *
    */
  private def extendPages(): Unit = {
    if (streamBuffers.isEmpty) {
      streamBuffers = new Array[Array[Byte]](ALLOC_STEP)
    }
    else {
      val newStreamBuffers = new Array[Array[Byte]](streamBuffers.length + ALLOC_STEP)
      Array.copy(streamBuffers, 0, newStreamBuffers, 0, streamBuffers.length)
      streamBuffers = newStreamBuffers
    }

    pageCount = streamBuffers.length
  }

  /** Ensures buffers are bug enough to hold specified amount of data
    *
    * @param value  Amount of data
    */
  private def allocSpaceIfNeeded(value: Long): Unit = {
    @tailrec
    def allocSpaceIfNeededIter(value: Long): Unit = {
      val currentPageCount = getPageCount(allocatedBytes)
      val neededPageCount = getPageCount(value)

      if (currentPageCount < neededPageCount) {
        if (currentPageCount == pageCount) extendPages()

        streamBuffers(currentPageCount) = new Array[Byte](PAGE_SIZE)
        allocatedBytes = (currentPageCount + 1).toLong * PAGE_SIZE

        allocSpaceIfNeededIter(value)
      }
    }

    if (value < 0) throw new Error("AllocSpaceIfNeeded < 0")
    if (value > 0) {
      allocSpaceIfNeededIter(value)

      length = Math.max(value, length)
      if (position > length) position = length
    }
  }

  /**
    * Writes the specified byte to this output stream. The general
    * contract for <code>write</code> is that one byte is written
    * to the output stream. The byte to be written is the eight
    * low-order bits of the argument <code>b</code>. The 24
    * high-order bits of <code>b</code> are ignored.
    * <p>
    * Subclasses of <code>OutputStream</code> must provide an
    * implementation for this method.
    *
    * @param      b the <code>byte</code>.
    */
  @throws[IOException]
  override def write(b: Int): Unit = {
    val buffer: Array[Byte] = new Array[Byte](1)

    buffer(0) = b.toByte

    write(buffer)
  }

  /**
    * Writes <code>len</code> bytes from the specified byte array
    * starting at offset <code>off</code> to this output stream.
    * The general contract for <code>write(b, off, len)</code> is that
    * some of the bytes in the array <code>b</code> are written to the
    * output stream in order; element <code>b[off]</code> is the first
    * byte written and <code>b[off+len-1]</code> is the last byte written
    * by this operation.
    * <p>
    * The <code>write</code> method of <code>OutputStream</code> calls
    * the write method of one argument on each of the bytes to be
    * written out. Subclasses are encouraged to override this method and
    * provide a more efficient implementation.
    * <p>
    * If <code>b</code> is <code>null</code>, a
    * <code>NullPointerException</code> is thrown.
    * <p>
    * If <code>off</code> is negative, or <code>len</code> is negative, or
    * <code>off+len</code> is greater than the length of the array
    * <code>b</code>, then an <tt>IndexOutOfBoundsException</tt> is thrown.
    *
    * @param      b   the data.
    * @param      off the start offset in the data.
    * @param      len the number of bytes to write.
    */
  @throws[IOException]
  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
    @tailrec
    def writeIter(b: Array[Byte], off: Int, len: Int): Unit = {
      val currentPage: Int = (position / PAGE_SIZE).toInt
      val currentOffset: Int = (position % PAGE_SIZE).toInt

      if (len != 0) {
        val currentLength: Int = Math.min(PAGE_SIZE - currentOffset, len)
        Array.copy(b, off, streamBuffers(currentPage), currentOffset, currentLength)

        position += currentLength

        writeIter(b, off + currentLength, len - currentLength)
      }
    }

    allocSpaceIfNeeded(position + len)
    writeIter(b, off, len)
  }

  /** Gets an InputStream that points to HugeMemoryOutputStream buffer
    *
    * @return InputStream
    */
  def asInputStream(): InputStream = {
    new HugeMemoryInputStream(streamBuffers, length)
  }

  private class HugeMemoryInputStream(streamBuffers: Array[Array[Byte]], val length: Long) extends InputStream {
    /** Current position in stream
      *
      */
    private var position: Long = 0

    /**
      * Reads the next byte of data from the input stream. The value byte is
      * returned as an <code>int</code> in the range <code>0</code> to
      * <code>255</code>. If no byte is available because the end of the stream
      * has been reached, the value <code>-1</code> is returned. This method
      * blocks until input data is available, the end of the stream is detected,
      * or an exception is thrown.
      *
      * <p> A subclass must provide an implementation of this method.
      *
      * @return the next byte of data, or <code>-1</code> if the end of the
      *         stream is reached.
      */
    @throws[IOException]
    def read: Int = {
      val buffer: Array[Byte] = new Array[Byte](1)

      if (read(buffer) == 0) throw new Error("End of stream")
      else buffer(0)
    }

    /**
      * Reads up to <code>len</code> bytes of data from the input stream into
      * an array of bytes.  An attempt is made to read as many as
      * <code>len</code> bytes, but a smaller number may be read.
      * The number of bytes actually read is returned as an integer.
      *
      * <p> This method blocks until input data is available, end of file is
      * detected, or an exception is thrown.
      *
      * <p> If <code>len</code> is zero, then no bytes are read and
      * <code>0</code> is returned; otherwise, there is an attempt to read at
      * least one byte. If no byte is available because the stream is at end of
      * file, the value <code>-1</code> is returned; otherwise, at least one
      * byte is read and stored into <code>b</code>.
      *
      * <p> The first byte read is stored into element <code>b[off]</code>, the
      * next one into <code>b[off+1]</code>, and so on. The number of bytes read
      * is, at most, equal to <code>len</code>. Let <i>k</i> be the number of
      * bytes actually read; these bytes will be stored in elements
      * <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>,
      * leaving elements <code>b[off+</code><i>k</i><code>]</code> through
      * <code>b[off+len-1]</code> unaffected.
      *
      * <p> In every case, elements <code>b[0]</code> through
      * <code>b[off]</code> and elements <code>b[off+len]</code> through
      * <code>b[b.length-1]</code> are unaffected.
      *
      * <p> The <code>read(b,</code> <code>off,</code> <code>len)</code> method
      * for class <code>InputStream</code> simply calls the method
      * <code>read()</code> repeatedly. If the first such call results in an
      * <code>IOException</code>, that exception is returned from the call to
      * the <code>read(b,</code> <code>off,</code> <code>len)</code> method.  If
      * any subsequent call to <code>read()</code> results in a
      * <code>IOException</code>, the exception is caught and treated as if it
      * were end of file; the bytes read up to that point are stored into
      * <code>b</code> and the number of bytes read before the exception
      * occurred is returned. The default implementation of this method blocks
      * until the requested amount of input data <code>len</code> has been read,
      * end of file is detected, or an exception is thrown. Subclasses are encouraged
      * to provide a more efficient implementation of this method.
      *
      * @param      b   the buffer into which the data is read.
      * @param      off the start offset in array <code>b</code>
      *                 at which the data is written.
      * @param      len the maximum number of bytes to read.
      * @return the total number of bytes read into the buffer, or
      *         <code>-1</code> if there is no more data because the end of
      *         the stream has been reached.
      * @see java.io.InputStream#read()
      */
    @throws[IOException]
    override def read(b: Array[Byte], off: Int, len: Int): Int = {
      @tailrec
      def readIter(acc: Int, b: Array[Byte], off: Int, len: Int): Int = {
        val currentPage: Int = (position / PAGE_SIZE).toInt
        val currentOffset: Int = (position % PAGE_SIZE).toInt

        val count: Int = Math.min(len, length - position).toInt

        if (count == 0 || position >= length) acc
        else {
          val currentLength = Math.min(PAGE_SIZE - currentOffset, count)
          Array.copy(streamBuffers(currentPage), currentOffset, b, off, currentLength)

          position += currentLength

          readIter(acc + currentLength, b, off + currentLength, len - currentLength)
        }
      }

      readIter(0, b, off, len)
    }

    /**
      * Skips over and discards <code>n</code> bytes of data from this input
      * stream. The <code>skip</code> method may, for a variety of reasons, end
      * up skipping over some smaller number of bytes, possibly <code>0</code>.
      * This may result from any of a number of conditions; reaching end of file
      * before <code>n</code> bytes have been skipped is only one possibility.
      * The actual number of bytes skipped is returned. If <code>n</code> is
      * negative, the <code>skip</code> method for class <code>InputStream</code> always
      * returns 0, and no bytes are skipped. Subclasses may handle the negative
      * value differently.
      *
      * The <code>skip</code> method of this class creates a
      * byte array and then repeatedly reads into it until <code>n</code> bytes
      * have been read or the end of the stream has been reached. Subclasses are
      * encouraged to provide a more efficient implementation of this method.
      * For instance, the implementation may depend on the ability to seek.
      *
      * @param      n the number of bytes to be skipped.
      * @return the actual number of bytes skipped.
      */
    @throws[IOException]
    override def skip(n: Long): Long = {
      if (n < 0) 0
      else {
        position = Math.min(position + n, length)
        length - position
      }
    }
  }
}

易于使用,无缓冲区重复,无 2GB 内存限制

val out: HugeMemoryOutputStream = new HugeMemoryOutputStream(initialCapacity /*may be 0*/)

out.write(...)
...

val in1: InputStream = out.asInputStream()

in1.read(...)
...

val in2: InputStream = out.asInputStream()

in2.read(...)
...
于 2018-04-10T17:16:04.237 回答
-1

如果您想从 InputStream 制作 OutputStream ,则存在一个基本问题。写入 OutputStream 的方法会阻塞,直到完成。所以写完方法后就可以得到结果了。这有两个后果:

  1. 如果只使用一个线程,则需要等到所有内容都写入(因此您需要将流的数据存储在内存或磁盘中)。
  2. 如果要在数据完成之前访问数据,则需要第二个线程。

变体 1 可以使用字节数组或字段来实现。变体 1 可以使用 pipies 实现(直接或使用额外的抽象 - 例如 RingBuffer 或其他评论中的 google lib)。

事实上,对于标准 java,没有其他方法可以解决这个问题。每个解决方案都是其中之一的实现。

有一个概念称为“延续”(有关详细信息,请参阅维基百科)。在这种情况下,基本上这意味着:

  • 有一个特殊的输出流,它需要一定量的数据
  • 如果达到数量,则流将控制权交给它的对应物,这是一个特殊的输入流
  • 输入流使数据量在被读取之前可用,之后,它将控制权传递回输出流

虽然有些语言内置了这个概念,但对于 java,您需要一些“魔法”。例如,来自 apache 的“commons-javaflow”为 java 实现了此类。缺点是这需要在构建时进行一些特殊的字节码修改。因此,将所有东西放在一个带有自定义构建脚本的额外库中是有意义的。

于 2013-10-11T20:48:31.890 回答
-2

旧帖子但可能对其他人有所帮助,请使用这种方式:

OutputStream out = new ByteArrayOutputStream();
...
out.write();
...
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toString().getBytes()));
于 2013-10-15T09:01:02.993 回答
-2

尽管您无法将 OutputStream 转换为 InputStream,但 java 提供了一种使用 PipedOutputStream 和 PipedInputStream 的方法,您可以将数据写入 PipedOutputStream 以通过关联的 PipedInputStream 变得可用。
有时,在处理需要将 InputStream 实例而不是 OutputStream 实例传递给它们的第三方库时,我遇到了类似的情况。
我解决此问题的方法是使用 PipedInputStream 和 PipedOutputStream。
顺便说一句,它们使用起来很棘手,您必须使用多线程来实现您想要的。我最近在 github 上发布了一个你可以使用的实现。
这是链接。您可以通过 wiki 了解如何使用它。

于 2017-04-25T08:30:20.930 回答