我正处于开发阶段,我有两个模块,我从一个模块中得到输出OutputStream,第二个模块只接受InputStream. 你知道如何转换OutputStream为InputStream(反之亦然,我的意思是真的这样)我将能够连接这两个部分吗?
谢谢
我正处于开发阶段,我有两个模块,我从一个模块中得到输出OutputStream,第二个模块只接受InputStream. 你知道如何转换OutputStream为InputStream(反之亦然,我的意思是真的这样)我将能够连接这两个部分吗?
谢谢
似乎有很多链接和其他类似的东西,但没有使用管道的实际代码。使用java.io.PipedInputStreamand的好处java.io.PipedOutputStream是没有额外的内存消耗。ByteArrayOutputStream.toByteArray()返回原始缓冲区的副本,这意味着无论您在内存中拥有什么,现在都有两个副本。然后写入一个InputStream意味着您现在拥有三个数据副本。
编码:
// take the copy of the stream and re-write it to an InputStream
PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
new Thread(new Runnable() {
public void run () {
try {
// write the original OutputStream to the PipedOutputStream
// note that in order for the below method to work, you need
// to ensure that the data has finished writing to the
// ByteArrayOutputStream
originalByteArrayOutputStream.writeTo(out);
}
catch (IOException e) {
// logging and exception handling should go here
}
finally {
// close the PipedOutputStream here because we're done writing data
// once this thread has completed its run
if (out != null) {
// close the PipedOutputStream cleanly
out.close();
}
}
}
}).start();
此代码假定originalByteArrayOutputStream是 aByteArrayOutputStream因为它通常是唯一可用的输出流,除非您正在写入文件。我希望这有帮助!这样做的好处是,因为它在一个单独的线程中,所以它也是并行工作的,所以任何消耗你的输入流的东西也会从你的旧输出流中流出。这是有益的,因为缓冲区可以保持较小,您将有更少的延迟和更少的内存使用。
AnOutputStream是您向其中写入数据的地方。如果某个模块公开了一个OutputStream,则期望在另一端有一些读取。
InputStream另一方面,暴露 的东西表明您将需要收听此流,并且将有您可以读取的数据。
所以可以将一个连接InputStream到一个OutputStream
InputStream----read---> intermediateBytes[n] ----write----> OutputStream
正如有人提到的,这就是IOUtilscopy()中的方法可以让你做的事情。走另一条路是没有意义的……希望这是有道理的
更新:
当然,我想得越多,我就越能看到这实际上是一个要求。我知道一些评论提到了Piped输入/输出流,但还有另一种可能性。
如果公开的输出流是 a ByteArrayOutputStream,那么您始终可以通过调用该toByteArray()方法来获取完整内容。ByteArrayInputStream然后,您可以使用子类创建输入流包装器。这两个是伪流,它们基本上都只是包装一个字节数组。因此,以这种方式使用流在技术上是可行的,但对我来说仍然很奇怪......
As input and output streams are just start and end point, the solution is to temporary store data in byte array. So you must create intermediate ByteArrayOutputStream, from which you create byte[] that is used as input for new ByteArrayInputStream.
public void doTwoThingsWithStream(InputStream inStream, OutputStream outStream){
//create temporary bayte array output stream
ByteArrayOutputStream baos = new ByteArrayOutputStream();
doFirstThing(inStream, baos);
//create input stream from baos
InputStream isFromFirstData = new ByteArrayInputStream(baos.toByteArray());
doSecondThing(isFromFirstData, outStream);
}
Hope it helps.
ByteArrayOutputStream buffer = (ByteArrayOutputStream) aOutputStream;
byte[] bytes = buffer.toByteArray();
InputStream inputStream = new ByteArrayInputStream(bytes);
您将需要一个中间类来缓冲。每次InputStream.read(byte[]...)调用时,缓冲类将用从传入的下一个块填充传入的字节数组OutputStream.write(byte[]...)。由于块的大小可能不同,适配器类将需要存储一定量,直到它有足够的空间来填充读取缓冲区和/或能够存储任何缓冲区溢出。
本文对解决此问题的几种不同方法进行了很好的细分:
http://blog.ostermiller.org/convert-java-outputstream-inputstream
easystream开源库直接支持将 OutputStream 转换为 InputStream: http : //io-tools.sourceforge.net/easystream/tutorial/tutorial.html
// create conversion
final OutputStreamToInputStream<Void> out = new OutputStreamToInputStream<Void>() {
@Override
protected Void doRead(final InputStream in) throws Exception {
LibraryClass2.processDataFromInputStream(in);
return null;
}
};
try {
LibraryClass1.writeDataToTheOutputStream(out);
} finally {
// don't miss the close (or a thread would not terminate correctly).
out.close();
}
他们还列出了其他选项:http: //io-tools.sourceforge.net/easystream/outputstream_to_inputstream/implementations.html
ByteArrayOutputStream我在将 a 转换为 a时遇到了同样的问题,ByteArrayInputStream并通过使用派生类解决了这个问题,该派生类ByteArrayOutputStream能够返回ByteArrayInputStream使用ByteArrayOutputStream. 这种方式不使用额外的内存,并且“转换”非常快:
package info.whitebyte.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
* This class extends the ByteArrayOutputStream by
* providing a method that returns a new ByteArrayInputStream
* which uses the internal byte array buffer. This buffer
* is not copied, so no additional memory is used. After
* creating the ByteArrayInputStream the instance of the
* ByteArrayInOutStream can not be used anymore.
* <p>
* The ByteArrayInputStream can be retrieved using <code>getInputStream()</code>.
* @author Nick Russler
*/
public class ByteArrayInOutStream extends ByteArrayOutputStream {
/**
* Creates a new ByteArrayInOutStream. The buffer capacity is
* initially 32 bytes, though its size increases if necessary.
*/
public ByteArrayInOutStream() {
super();
}
/**
* Creates a new ByteArrayInOutStream, with a buffer capacity of
* the specified size, in bytes.
*
* @param size the initial size.
* @exception IllegalArgumentException if size is negative.
*/
public ByteArrayInOutStream(int size) {
super(size);
}
/**
* Creates a new ByteArrayInputStream that uses the internal byte array buffer
* of this ByteArrayInOutStream instance as its buffer array. The initial value
* of pos is set to zero and the initial value of count is the number of bytes
* that can be read from the byte array. The buffer array is not copied. This
* instance of ByteArrayInOutStream can not be used anymore after calling this
* method.
* @return the ByteArrayInputStream instance
*/
public ByteArrayInputStream getInputStream() {
// create new ByteArrayInputStream that respects the current count
ByteArrayInputStream in = new ByteArrayInputStream(this.buf, 0, this.count);
// set the buffer of the ByteArrayOutputStream
// to null so it can't be altered anymore
this.buf = null;
return in;
}
}
我把东西放在github上:https ://github.com/nickrussler/ByteArrayInOutStream
库io-extras可能很有用。例如,如果您想使用 gzipInputStream并GZIPOutputStream希望它同步发生(使用默认缓冲区大小 8192):
InputStream is = ...
InputStream gz = IOUtil.pipe(is, o -> new GZIPOutputStream(o));
请注意,该库具有 100% 的单元测试覆盖率(当然值得!)并且位于 Maven Central 上。Maven依赖是:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>io-extras</artifactId>
<version>0.1</version>
</dependency>
请务必检查更高版本。
从我的角度来看,java.io.PipedInputStream/java.io.PipedOutputStream 是考虑的最佳选择。在某些情况下,您可能想要使用 ByteArrayInputStream/ByteArrayOutputStream。问题是您需要复制缓冲区才能将 ByteArrayOutputStream 转换为 ByteArrayInputStream。ByteArrayOutpuStream/ByteArrayInputStream 也限制为 2GB。这是我为绕过 ByteArrayOutputStream/ByteArrayInputStream 限制而编写的 OutpuStream/InputStream 实现(Scala 代码,但对于 java 开发人员来说很容易理解):
import java.io.{IOException, InputStream, OutputStream}
import scala.annotation.tailrec
/** Acts as a replacement for ByteArrayOutputStream
*
*/
class HugeMemoryOutputStream(capacity: Long) extends OutputStream {
private val PAGE_SIZE: Int = 1024000
private val ALLOC_STEP: Int = 1024
/** Pages array
*
*/
private var streamBuffers: Array[Array[Byte]] = Array.empty[Array[Byte]]
/** Allocated pages count
*
*/
private var pageCount: Int = 0
/** Allocated bytes count
*
*/
private var allocatedBytes: Long = 0
/** Current position in stream
*
*/
private var position: Long = 0
/** Stream length
*
*/
private var length: Long = 0
allocSpaceIfNeeded(capacity)
/** Gets page count based on given length
*
* @param length Buffer length
* @return Page count to hold the specified amount of data
*/
private def getPageCount(length: Long) = {
var pageCount = (length / PAGE_SIZE).toInt + 1
if ((length % PAGE_SIZE) == 0) {
pageCount -= 1
}
pageCount
}
/** Extends pages array
*
*/
private def extendPages(): Unit = {
if (streamBuffers.isEmpty) {
streamBuffers = new Array[Array[Byte]](ALLOC_STEP)
}
else {
val newStreamBuffers = new Array[Array[Byte]](streamBuffers.length + ALLOC_STEP)
Array.copy(streamBuffers, 0, newStreamBuffers, 0, streamBuffers.length)
streamBuffers = newStreamBuffers
}
pageCount = streamBuffers.length
}
/** Ensures buffers are bug enough to hold specified amount of data
*
* @param value Amount of data
*/
private def allocSpaceIfNeeded(value: Long): Unit = {
@tailrec
def allocSpaceIfNeededIter(value: Long): Unit = {
val currentPageCount = getPageCount(allocatedBytes)
val neededPageCount = getPageCount(value)
if (currentPageCount < neededPageCount) {
if (currentPageCount == pageCount) extendPages()
streamBuffers(currentPageCount) = new Array[Byte](PAGE_SIZE)
allocatedBytes = (currentPageCount + 1).toLong * PAGE_SIZE
allocSpaceIfNeededIter(value)
}
}
if (value < 0) throw new Error("AllocSpaceIfNeeded < 0")
if (value > 0) {
allocSpaceIfNeededIter(value)
length = Math.max(value, length)
if (position > length) position = length
}
}
/**
* Writes the specified byte to this output stream. The general
* contract for <code>write</code> is that one byte is written
* to the output stream. The byte to be written is the eight
* low-order bits of the argument <code>b</code>. The 24
* high-order bits of <code>b</code> are ignored.
* <p>
* Subclasses of <code>OutputStream</code> must provide an
* implementation for this method.
*
* @param b the <code>byte</code>.
*/
@throws[IOException]
override def write(b: Int): Unit = {
val buffer: Array[Byte] = new Array[Byte](1)
buffer(0) = b.toByte
write(buffer)
}
/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> to this output stream.
* The general contract for <code>write(b, off, len)</code> is that
* some of the bytes in the array <code>b</code> are written to the
* output stream in order; element <code>b[off]</code> is the first
* byte written and <code>b[off+len-1]</code> is the last byte written
* by this operation.
* <p>
* The <code>write</code> method of <code>OutputStream</code> calls
* the write method of one argument on each of the bytes to be
* written out. Subclasses are encouraged to override this method and
* provide a more efficient implementation.
* <p>
* If <code>b</code> is <code>null</code>, a
* <code>NullPointerException</code> is thrown.
* <p>
* If <code>off</code> is negative, or <code>len</code> is negative, or
* <code>off+len</code> is greater than the length of the array
* <code>b</code>, then an <tt>IndexOutOfBoundsException</tt> is thrown.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
@throws[IOException]
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
@tailrec
def writeIter(b: Array[Byte], off: Int, len: Int): Unit = {
val currentPage: Int = (position / PAGE_SIZE).toInt
val currentOffset: Int = (position % PAGE_SIZE).toInt
if (len != 0) {
val currentLength: Int = Math.min(PAGE_SIZE - currentOffset, len)
Array.copy(b, off, streamBuffers(currentPage), currentOffset, currentLength)
position += currentLength
writeIter(b, off + currentLength, len - currentLength)
}
}
allocSpaceIfNeeded(position + len)
writeIter(b, off, len)
}
/** Gets an InputStream that points to HugeMemoryOutputStream buffer
*
* @return InputStream
*/
def asInputStream(): InputStream = {
new HugeMemoryInputStream(streamBuffers, length)
}
private class HugeMemoryInputStream(streamBuffers: Array[Array[Byte]], val length: Long) extends InputStream {
/** Current position in stream
*
*/
private var position: Long = 0
/**
* Reads the next byte of data from the input stream. The value byte is
* returned as an <code>int</code> in the range <code>0</code> to
* <code>255</code>. If no byte is available because the end of the stream
* has been reached, the value <code>-1</code> is returned. This method
* blocks until input data is available, the end of the stream is detected,
* or an exception is thrown.
*
* <p> A subclass must provide an implementation of this method.
*
* @return the next byte of data, or <code>-1</code> if the end of the
* stream is reached.
*/
@throws[IOException]
def read: Int = {
val buffer: Array[Byte] = new Array[Byte](1)
if (read(buffer) == 0) throw new Error("End of stream")
else buffer(0)
}
/**
* Reads up to <code>len</code> bytes of data from the input stream into
* an array of bytes. An attempt is made to read as many as
* <code>len</code> bytes, but a smaller number may be read.
* The number of bytes actually read is returned as an integer.
*
* <p> This method blocks until input data is available, end of file is
* detected, or an exception is thrown.
*
* <p> If <code>len</code> is zero, then no bytes are read and
* <code>0</code> is returned; otherwise, there is an attempt to read at
* least one byte. If no byte is available because the stream is at end of
* file, the value <code>-1</code> is returned; otherwise, at least one
* byte is read and stored into <code>b</code>.
*
* <p> The first byte read is stored into element <code>b[off]</code>, the
* next one into <code>b[off+1]</code>, and so on. The number of bytes read
* is, at most, equal to <code>len</code>. Let <i>k</i> be the number of
* bytes actually read; these bytes will be stored in elements
* <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>,
* leaving elements <code>b[off+</code><i>k</i><code>]</code> through
* <code>b[off+len-1]</code> unaffected.
*
* <p> In every case, elements <code>b[0]</code> through
* <code>b[off]</code> and elements <code>b[off+len]</code> through
* <code>b[b.length-1]</code> are unaffected.
*
* <p> The <code>read(b,</code> <code>off,</code> <code>len)</code> method
* for class <code>InputStream</code> simply calls the method
* <code>read()</code> repeatedly. If the first such call results in an
* <code>IOException</code>, that exception is returned from the call to
* the <code>read(b,</code> <code>off,</code> <code>len)</code> method. If
* any subsequent call to <code>read()</code> results in a
* <code>IOException</code>, the exception is caught and treated as if it
* were end of file; the bytes read up to that point are stored into
* <code>b</code> and the number of bytes read before the exception
* occurred is returned. The default implementation of this method blocks
* until the requested amount of input data <code>len</code> has been read,
* end of file is detected, or an exception is thrown. Subclasses are encouraged
* to provide a more efficient implementation of this method.
*
* @param b the buffer into which the data is read.
* @param off the start offset in array <code>b</code>
* at which the data is written.
* @param len the maximum number of bytes to read.
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of
* the stream has been reached.
* @see java.io.InputStream#read()
*/
@throws[IOException]
override def read(b: Array[Byte], off: Int, len: Int): Int = {
@tailrec
def readIter(acc: Int, b: Array[Byte], off: Int, len: Int): Int = {
val currentPage: Int = (position / PAGE_SIZE).toInt
val currentOffset: Int = (position % PAGE_SIZE).toInt
val count: Int = Math.min(len, length - position).toInt
if (count == 0 || position >= length) acc
else {
val currentLength = Math.min(PAGE_SIZE - currentOffset, count)
Array.copy(streamBuffers(currentPage), currentOffset, b, off, currentLength)
position += currentLength
readIter(acc + currentLength, b, off + currentLength, len - currentLength)
}
}
readIter(0, b, off, len)
}
/**
* Skips over and discards <code>n</code> bytes of data from this input
* stream. The <code>skip</code> method may, for a variety of reasons, end
* up skipping over some smaller number of bytes, possibly <code>0</code>.
* This may result from any of a number of conditions; reaching end of file
* before <code>n</code> bytes have been skipped is only one possibility.
* The actual number of bytes skipped is returned. If <code>n</code> is
* negative, the <code>skip</code> method for class <code>InputStream</code> always
* returns 0, and no bytes are skipped. Subclasses may handle the negative
* value differently.
*
* The <code>skip</code> method of this class creates a
* byte array and then repeatedly reads into it until <code>n</code> bytes
* have been read or the end of the stream has been reached. Subclasses are
* encouraged to provide a more efficient implementation of this method.
* For instance, the implementation may depend on the ability to seek.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
*/
@throws[IOException]
override def skip(n: Long): Long = {
if (n < 0) 0
else {
position = Math.min(position + n, length)
length - position
}
}
}
}
易于使用,无缓冲区重复,无 2GB 内存限制
val out: HugeMemoryOutputStream = new HugeMemoryOutputStream(initialCapacity /*may be 0*/)
out.write(...)
...
val in1: InputStream = out.asInputStream()
in1.read(...)
...
val in2: InputStream = out.asInputStream()
in2.read(...)
...
如果您想从 InputStream 制作 OutputStream ,则存在一个基本问题。写入 OutputStream 的方法会阻塞,直到完成。所以写完方法后就可以得到结果了。这有两个后果:
变体 1 可以使用字节数组或字段来实现。变体 1 可以使用 pipies 实现(直接或使用额外的抽象 - 例如 RingBuffer 或其他评论中的 google lib)。
事实上,对于标准 java,没有其他方法可以解决这个问题。每个解决方案都是其中之一的实现。
有一个概念称为“延续”(有关详细信息,请参阅维基百科)。在这种情况下,基本上这意味着:
虽然有些语言内置了这个概念,但对于 java,您需要一些“魔法”。例如,来自 apache 的“commons-javaflow”为 java 实现了此类。缺点是这需要在构建时进行一些特殊的字节码修改。因此,将所有东西放在一个带有自定义构建脚本的额外库中是有意义的。
旧帖子但可能对其他人有所帮助,请使用这种方式:
OutputStream out = new ByteArrayOutputStream();
...
out.write();
...
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toString().getBytes()));
尽管您无法将 OutputStream 转换为 InputStream,但 java 提供了一种使用 PipedOutputStream 和 PipedInputStream 的方法,您可以将数据写入 PipedOutputStream 以通过关联的 PipedInputStream 变得可用。
有时,在处理需要将 InputStream 实例而不是 OutputStream 实例传递给它们的第三方库时,我遇到了类似的情况。
我解决此问题的方法是使用 PipedInputStream 和 PipedOutputStream。
顺便说一句,它们使用起来很棘手,您必须使用多线程来实现您想要的。我最近在 github 上发布了一个你可以使用的实现。
这是链接。您可以通过 wiki 了解如何使用它。