3

我搜索了所有我能找到的与 PipedInputStreams 和 PipedOutputStreams 相关的问题,但没有找到任何可以帮助我的东西。希望这里有人会遇到类似的事情。

背景:

我有一个从任何java.io.InputStream读取数据的类。该类有一个名为hasNext()的方法,它检查给定的 InputStream 是否有数据,如果找到数据则返回 true,否则返回 false。这个 hasNext() 方法与其他 InputStreams 完美配合,但是当我尝试使用 PipedInputStream(从不同线程中的 PipedOutputStream 馈送,封装在下面的 inputSupplier 变量中)时,它挂起。在研究了 hasNext() 方法的工作原理后,我使用以下代码重新创建了问题:

public static void main(String [] args){
    PipedInputStream inputSourceStream = new PipedInputStream(inputSupplier.getOutputStream());
    byte[] input = new byte[4096];
    int bytes_read = inputSourceStream.read(input, 0, 4096);
}

inputSupplier 只是我编写的一个小类的一个实例,它在自己的线程中运行,并带有一个本地 PipedOutputStream 以避免出现死锁。

问题 所以,我的问题是 hasNext() 方法在流上调用 PipedInputStream.read() 方法以确定是否有任何数据要读取。这会导致阻塞读取操作永远不会退出,直到某些数据到达要读取。这意味着如果流为空,我的 hasNext() 函数将永远不会返回 false(或根本不返回)。

免责声明:我知道 available() 方法,但告诉我的只是没有可用的字节,而不是我们处于流的末尾(无论 Stream 的实现可能是什么),所以 read() 是需要检查这一点。

[编辑]我最初使用 PipedInputStream 的全部目的是模拟“突发”数据源。也就是说,我需要一个可以偶尔写入的 Stream,以查看我的 hasNext() 方法是否会在读取 Stream 时检测到该 Stream 上有新数据。如果有更好的方法来做到这一点,那么我会很高兴听到它!

4

2 回答 2

1

我讨厌删除这么老的问题,但这已经接近谷歌结果的顶部,我刚刚为自己找到了一个解决方案:这个循环字节缓冲区公开输入和输出流,当没有数据时读取方法立即返回 -1当下。一点点线程,您的测试类可以完全按照您想要的方式提供数据。

http://ostermiller.org/utils/src/CircularByteBuffer.java.html

编辑

原来我误解了上述类的文档,它只在线程调用read()被中断时返回-1。我对 read 方法做了一个快速修改,它给了我想要的东西(原始代码被注释掉,唯一的新代码是用 anelse替换else if

@Override public int read(byte[] cbuf, int off, int len) throws IOException {
        //while (true){
            synchronized (CircularByteBuffer.this){
                if (inputStreamClosed) throw new IOException("InputStream has been closed; cannot read from a closed InputStream.");
                int available = CircularByteBuffer.this.available();
                if (available > 0){
                    int length = Math.min(len, available);
                    int firstLen = Math.min(length, buffer.length - readPosition);
                    int secondLen = length - firstLen;
                    System.arraycopy(buffer, readPosition, cbuf, off, firstLen);
                    if (secondLen > 0){
                        System.arraycopy(buffer, 0, cbuf, off+firstLen,  secondLen);
                        readPosition = secondLen;
                    } else {
                        readPosition += length;
                    }
                    if (readPosition == buffer.length) {
                        readPosition = 0;
                    }
                    ensureMark();
                    return length;
                //} else if (outputStreamClosed){
                } else {  // << new line of code
                    return -1;
                }
            }
            //try {
            //    Thread.sleep(100);
            //} catch(Exception x){
            //    throw new IOException("Blocking read operation interrupted.");
            //}
        //}
    }

```

于 2016-03-15T15:42:34.560 回答
0

Java SE 6 及更高版本(如果我错了,请纠正我)附带 java.nio 包,该包专为异步 I/O 设计,听起来像您所描述的

于 2012-04-30T14:22:13.767 回答