9

有没有人对在 Java 中创建一个既是 InputStream 又是 OutputStream 的 Pipe 对象有什么好的建议,因为Java 没有多重继承,并且两个流都是抽象类而不是接口?

潜在的需求是拥有一个可以传递给需要 InputStream 或 OutputStream 的东西的单一对象,以将输出从一个线程传递到另一个线程的输入。

4

7 回答 7

8

似乎错过了这个问题的重点。如果我理解正确,您需要一个对象在一个线程中像 InputStream 一样工作,而在另一个线程中像 OutputStream 一样创建两个线程之间的通信方式。

也许一个答案是使用组合而不是继承(无论如何这是推荐的做法)。使用 getInputStream() 和 getOutputStream() 方法创建一个包含相互连接的 PipedInputStream 和 PipedOutputStream 的管道。

您不能直接将 Pipe 对象传递给需要流的对象,但您可以传递它的 get 方法的返回值来执行此操作。

那对你有用吗?

于 2008-12-13T23:25:02.990 回答
6

java.io.PipedOutputStream 和 java.io.PipedInputStream 看起来是用于此场景的类。它们被设计为一起用于在线程之间传递数据。

如果你真的想要一些单个对象传递,它需要包含其中的每一个并通过 getter 公开它们。

于 2008-12-13T08:22:12.223 回答
3

我认为这是很常见的事情。看到这个问题。

将 Java InputStream 的内容写入 OutputStream 的简单方法

于 2008-12-13T07:01:04.367 回答
1

你不能创建一个既派生自InputStream又派生的类,OutputStream因为它们不是接口并且它们具有通用方法并且Java不允许多重继承(编译器不知道是否调用InputStream.close()或者OutputStream.close()你是否调用close()你的新对象) .

另一个问题是缓冲区。Java 想要为数据分配一个静态缓冲区(它不会改变)。这意味着当您使用 `java.io.PipedXxxStream' 时,写入数据最终会阻塞,除非您使用两个不同的线程。

所以来自 Apocalisp 的答案是正确的:你必须编写一个复制循环。

我建议您在项目中包含 Apache 的 commons-io,其中包含许多仅用于此类任务的辅助例程(在流、文件、字符串及其所有组合之间复制数据)。

于 2008-12-13T09:23:02.740 回答
1

http://ostermiller.org/utils/CircularBuffer.html

于 2009-05-15T20:04:35.830 回答
0

我必须为与 Servlet 的慢速连接实现一个过滤器,所以基本上我将 servlet 输出流包装到一个 QueueOutputStream 中,它将每个字节(在小缓冲区中)添加到一个队列中,然后将这些小缓冲区输出到第二个输出流,所以在某种程度上,这充当输入/输出流,恕我直言,这比无法很好扩展的 JDK 管道更好,基本上标准 JDK 实现(每个读/写)中有太多的上下文切换,阻塞队列只是非常适合单个生产者/消费者场景:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
于 2013-10-19T12:23:21.490 回答
0

最好使用 Pipe 或 ArrayBlockingQueue,我建议您不要使用 PipedInput/OutputStream,因为它们有不好的做法,即使您可以在下面的链接中看到他们已要求弃用,因为这会导致许多问题。

https://bugs.openjdk.java.net/browse/JDK-8223048

对于 BlockingQueue 和 Pipe 这里是一个简单的例子

管道:

Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel = pipe.sink();
String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    sinkChannel.write(buf);
}
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf = ByteBuffer.allocate(48);

int bytesRead = inChannel.read(buf);

参考:http ://tutorials.jenkov.com/java-nio/pipe.html

阻塞队列:

//Shared class used by threads
public class Buffer {
    // ArrayBlockingQueue
    private BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(1);

    public void get() {
        // retrieve from ArrayBlockingQueue
        try {
            System.out.println("Consumer received - " + blockingQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void put(int data) {
        try {
            // putting in ArrayBlockingQueue
            blockingQueue.put(data);
            System.out.println("Producer produced - " + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {
        // Starting two threads
        ExecutorService executorService = null;
        try {
            Buffer buffer = new Buffer();
            executorService = Executors.newFixedThreadPool(2);
            executorService.execute(new Producer(buffer));
            executorService.execute(new Consumer(buffer));
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(executorService != null) {
                executorService.shutdown();
            }
        }
    }

public class Consumer implements Runnable {
    private Buffer buffer;

    public Consumer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while (true) {
            try {
                buffer.get();
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class Producer implements Runnable {
    private Buffer buffer;

    public Producer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while (true) {
            Random random = new Random();
            int data = random.nextInt(1000);
            buffer.put(data);
        }
    }
}

参考: https ://github.com/kishanjavatrainer/ArrayBlockingQueueDemo/tree/master/ArrayBlockingQueueDemo

于 2020-11-27T00:33:47.737 回答