78

java9 中的更新:https ://docs.oracle.com/javase/9​​/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

我看到了一些类似的,但不是我需要的线程。

我有一个服务器,它基本上会从客户端客户端 A 接收输入,并将其逐字节转发到另一个客户端客户端 B。

我想将客户端 A 的输入流与客户端 B 的输出流连接起来。这可能吗?有什么方法可以做到这一点?

此外,这些客户端正在相互发送消息,这些消息对时间有些敏感,因此无法进行缓冲。我不想要一个说 500 的缓冲区并且客户端发送 499 个字节,然后我的服务器推迟转发 500 个字节,因为它没有收到最后一个字节来填充缓冲区。

现在,我正在解析每条消息以找到它的长度,然后读取长度字节,然后转发它们。我认为(并测试)这比读取一个字节并一遍又一遍地转发一个字节要好,因为那会非常慢。出于我在上一段中所述的原因,我也不想使用缓冲区或计时器——我不希望仅仅因为缓冲区未满而等待很长时间才能通过的消息。

有什么好方法可以做到这一点?

4

10 回答 10

86

仅仅因为您使用缓冲区并不意味着流必须填充该缓冲区。换句话说,这应该没问题:

public static void copyStream(InputStream input, OutputStream output)
    throws IOException
{
    byte[] buffer = new byte[1024]; // Adjust if you want
    int bytesRead;
    while ((bytesRead = input.read(buffer)) != -1)
    {
        output.write(buffer, 0, bytesRead);
    }
}

这应该可以正常工作 - 基本上read调用将阻塞,直到有一些数据可用,但它不会等到所有数据都可以填充缓冲区。(我想它可以,而且我相信FileInputStream通常填满缓冲区,但附加到套接字的流更有可能立即为您提供数据。)

我认为至少值得首先尝试这个简单的解决方案。

于 2009-10-15T20:33:14.807 回答
78

只是使用怎么样

void feedInputToOutput(InputStream in, OutputStream out) {
   IOUtils.copy(in, out);
}

并完成它?

来自 jakarta apache commons i/o library,它已经被大量项目使用,所以你可能已经在你的类路径中拥有了 jar。

于 2012-02-28T21:53:25.460 回答
23

JDK 9InputStream#transferTo(OutputStream out)为此功能添加。

于 2017-03-10T03:27:22.983 回答
22

为了完整起见,番石榴也有一个方便的实用程序

ByteStreams.copy(input, output);
于 2013-09-08T06:18:40.527 回答
11

您可以使用循环缓冲区:

代码

// buffer all data in a circular buffer of infinite size
CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE);
class1.putDataOnOutputStream(cbb.getOutputStream());
class2.processDataFromInputStream(cbb.getInputStream());


Maven依赖

<dependency>
    <groupId>org.ostermiller</groupId>
    <artifactId>utils</artifactId>
    <version>1.07.00</version>
</dependency>


模式详情

http://ostermiller.org/utils/CircularBuffer.html

于 2011-11-25T01:58:39.063 回答
9

异步方式来实现它。

void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out) {
    Thread t = new Thread(new Runnable() {

        public void run() {
            try {
                int d;
                while ((d = inputStream.read()) != -1) {
                    out.write(d);
                }
            } catch (IOException ex) {
                //TODO make a callback on exception.
            }
        }
    });
    t.setDaemon(true);
    t.start();
}
于 2012-06-28T00:34:51.777 回答
3

BUFFER_SIZE 是要读入的卡盘大小。应大于 1kb 且小于 10MB。

private static final int BUFFER_SIZE = 2 * 1024 * 1024;
private void copy(InputStream input, OutputStream output) throws IOException {
    try {
        byte[] buffer = new byte[BUFFER_SIZE];
        int bytesRead = input.read(buffer);
        while (bytesRead != -1) {
            output.write(buffer, 0, bytesRead);
            bytesRead = input.read(buffer);
        }
    //If needed, close streams.
    } finally {
        input.close();
        output.close();
    }
}
于 2015-04-28T09:44:57.233 回答
2

使用 org.apache.commons.io.IOUtils

InputStream inStream = new ...
OutputStream outStream = new ...
IOUtils.copy(inStream, outStream);

copyLarge大小 >2GB

于 2017-10-20T07:01:17.583 回答
1

这是一个干净且快速的 Scala 版本(没有 stackoverflow):

  import scala.annotation.tailrec
  import java.io._

  implicit class InputStreamOps(in: InputStream) {
    def >(out: OutputStream): Unit = pipeTo(out)

    def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize))

    @tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match {
      case n if n > 0 =>
        out.write(buffer, 0, n)
        pipeTo(out, buffer)
      case _ =>
        in.close()
        out.close()
    }
  }

这可以使用>符号,例如inputstream > outputstream,也可以传入自定义缓冲区/大小。

于 2015-09-17T18:26:40.840 回答
-17

如果您喜欢函数式,这是一个用 Scala 编写的函数,展示了如何仅使用 vals(而不是 vars)将输入流复制到输出流。

def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int) {
  val buffer = new Array[Byte](bufferSize);
  def recurse() {
    val len = inputStream.read(buffer);
    if (len > 0) {
      outputStream.write(buffer.take(len));
      recurse();
    }
  }
  recurse();
}

请注意,这不建议在可用内存很少的 java 应用程序中使用,因为使用递归函数很容易得到堆栈溢出异常错误

于 2013-03-20T12:46:38.237 回答