有没有人对在 Java 中创建一个既是 InputStream 又是 OutputStream 的 Pipe 对象有什么好的建议,因为Java 没有多重继承,并且两个流都是抽象类而不是接口?
潜在的需求是拥有一个可以传递给需要 InputStream 或 OutputStream 的东西的单一对象,以将输出从一个线程传递到另一个线程的输入。
有没有人对在 Java 中创建一个既是 InputStream 又是 OutputStream 的 Pipe 对象有什么好的建议,因为Java 没有多重继承,并且两个流都是抽象类而不是接口?
潜在的需求是拥有一个可以传递给需要 InputStream 或 OutputStream 的东西的单一对象,以将输出从一个线程传递到另一个线程的输入。
似乎错过了这个问题的重点。如果我理解正确,您需要一个对象在一个线程中像 InputStream 一样工作,而在另一个线程中像 OutputStream 一样创建两个线程之间的通信方式。
也许一个答案是使用组合而不是继承(无论如何这是推荐的做法)。使用 getInputStream() 和 getOutputStream() 方法创建一个包含相互连接的 PipedInputStream 和 PipedOutputStream 的管道。
您不能直接将 Pipe 对象传递给需要流的对象,但您可以传递它的 get 方法的返回值来执行此操作。
那对你有用吗?
java.io.PipedOutputStream 和 java.io.PipedInputStream 看起来是用于此场景的类。它们被设计为一起用于在线程之间传递数据。
如果你真的想要一些单个对象传递,它需要包含其中的每一个并通过 getter 公开它们。
我认为这是很常见的事情。看到这个问题。
你不能创建一个既派生自InputStream
又派生的类,OutputStream
因为它们不是接口并且它们具有通用方法并且Java不允许多重继承(编译器不知道是否调用InputStream.close()
或者OutputStream.close()
你是否调用close()
你的新对象) .
另一个问题是缓冲区。Java 想要为数据分配一个静态缓冲区(它不会改变)。这意味着当您使用 `java.io.PipedXxxStream' 时,写入数据最终会阻塞,除非您使用两个不同的线程。
所以来自 Apocalisp 的答案是正确的:你必须编写一个复制循环。
我建议您在项目中包含 Apache 的 commons-io,其中包含许多仅用于此类任务的辅助例程(在流、文件、字符串及其所有组合之间复制数据)。
我必须为与 Servlet 的慢速连接实现一个过滤器,所以基本上我将 servlet 输出流包装到一个 QueueOutputStream 中,它将每个字节(在小缓冲区中)添加到一个队列中,然后将这些小缓冲区输出到第二个输出流,所以在某种程度上,这充当输入/输出流,恕我直言,这比无法很好扩展的 JDK 管道更好,基本上标准 JDK 实现(每个读/写)中有太多的上下文切换,阻塞队列只是非常适合单个生产者/消费者场景:
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;
public class QueueOutputStream extends OutputStream
{
private static final int DEFAULT_BUFFER_SIZE=1024;
private static final byte[] END_SIGNAL=new byte[]{};
private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
private final byte[] buffer;
private boolean closed=false;
private int count=0;
public QueueOutputStream()
{
this(DEFAULT_BUFFER_SIZE);
}
public QueueOutputStream(final int bufferSize)
{
if(bufferSize<=0){
throw new IllegalArgumentException("Buffer size <= 0");
}
this.buffer=new byte[bufferSize];
}
private synchronized void flushBuffer()
{
if(count>0){
final byte[] copy=new byte[count];
System.arraycopy(buffer,0,copy,0,count);
queue.offer(copy);
count=0;
}
}
@Override
public synchronized void write(final int b) throws IOException
{
if(closed){
throw new IllegalStateException("Stream is closed");
}
if(count>=buffer.length){
flushBuffer();
}
buffer[count++]=(byte)b;
}
@Override
public synchronized void write(final byte[] b, final int off, final int len) throws IOException
{
super.write(b,off,len);
}
@Override
public synchronized void close() throws IOException
{
flushBuffer();
queue.offer(END_SIGNAL);
closed=true;
}
public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
{
return executor.submit(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try{
byte[] buffer=queue.take();
while(buffer!=END_SIGNAL){
outputStream.write(buffer);
buffer=queue.take();
}
outputStream.flush();
} catch(Exception e){
close();
throw e;
} finally{
outputStream.close();
}
return null;
}
}
);
}
最好使用 Pipe 或 ArrayBlockingQueue,我建议您不要使用 PipedInput/OutputStream,因为它们有不好的做法,即使您可以在下面的链接中看到他们已要求弃用,因为这会导致许多问题。
https://bugs.openjdk.java.net/browse/JDK-8223048
对于 BlockingQueue 和 Pipe 这里是一个简单的例子
管道:
Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel = pipe.sink();
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
sinkChannel.write(buf);
}
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
参考:http ://tutorials.jenkov.com/java-nio/pipe.html
阻塞队列:
//Shared class used by threads
public class Buffer {
// ArrayBlockingQueue
private BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(1);
public void get() {
// retrieve from ArrayBlockingQueue
try {
System.out.println("Consumer received - " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void put(int data) {
try {
// putting in ArrayBlockingQueue
blockingQueue.put(data);
System.out.println("Producer produced - " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// Starting two threads
ExecutorService executorService = null;
try {
Buffer buffer = new Buffer();
executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Producer(buffer));
executorService.execute(new Consumer(buffer));
} catch (Exception e) {
e.printStackTrace();
}finally {
if(executorService != null) {
executorService.shutdown();
}
}
}
public class Consumer implements Runnable {
private Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
try {
buffer.get();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Producer implements Runnable {
private Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
Random random = new Random();
int data = random.nextInt(1000);
buffer.put(data);
}
}
}
参考: https ://github.com/kishanjavatrainer/ArrayBlockingQueueDemo/tree/master/ArrayBlockingQueueDemo