16

我正在使用一个库,该库需要我提供一个实现此接口的对象:

public interface IConsole {
    TextWriter StandardInput { get; }
    TextReader StandardOutput { get; }
    TextReader StandardError { get; }
}

该对象的读者然后被图书馆使用:

IConsole console = new MyConsole();
int readBytes = console.StandardOutput.Read(buffer, 0, buffer.Length);

通常,实现 IConsole 的类具有来自外部进程的 StandardOutput 流。在这种情况下,console.StandardOutput.Read 调用通过阻塞工作,直到有一些数据写入 StandardOutput 流。

我正在尝试做的是创建一个测试IConsole 实现,它使用MemoryStreams 并将StandardInput 上出现的任何内容回显到StandardInput。我试过:

MemoryStream echoOutStream = new MemoryStream();
StandardOutput = new StreamReader(echoOutStream);

但问题在于 console.StandardOutput.Read 将返回 0 而不是阻塞,直到有一些数据。如果没有可用数据或者我可以使用不同的内存流,我是否可以让 MemoryStream 阻塞?

4

5 回答 5

13

受您的回答启发,这是我的多线程、多写版本:

public class EchoStream : MemoryStream
{
    private readonly ManualResetEvent _DataReady = new ManualResetEvent(false);
    private readonly ConcurrentQueue<byte[]> _Buffers = new ConcurrentQueue<byte[]>();

    public bool DataAvailable{get { return !_Buffers.IsEmpty; }}

    public override void Write(byte[] buffer, int offset, int count)
    {
        _Buffers.Enqueue(buffer);
        _DataReady.Set();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        _DataReady.WaitOne();

        byte[] lBuffer;

        if (!_Buffers.TryDequeue(out lBuffer))
        {
            _DataReady.Reset();
            return -1;
        }

        if (!DataAvailable)
            _DataReady.Reset();

        Array.Copy(lBuffer, buffer, lBuffer.Length);
        return lBuffer.Length;
    }
}

使用您的版本,您应该在写入时读取流,而不能进行任何连续写入。我的版本缓冲 ConcurrentQueue 中的任何写入缓冲区(将其更改为简单队列并锁定它相当简单)

于 2013-10-02T12:54:28.713 回答
10

最后,我找到了一种简单的方法,即从 MemoryStream 继承并接管 Read 和 Write 方法。

public class EchoStream : MemoryStream {

    private ManualResetEvent m_dataReady = new ManualResetEvent(false);
    private byte[] m_buffer;
    private int m_offset;
    private int m_count;

    public override void Write(byte[] buffer, int offset, int count) {
        m_buffer = buffer;
        m_offset = offset;
        m_count = count;
        m_dataReady.Set();
    }

    public override int Read(byte[] buffer, int offset, int count) {
        if (m_buffer == null) {
            // Block until the stream has some more data.
            m_dataReady.Reset();
            m_dataReady.WaitOne();    
        }

        Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, (count < m_count) ? count : m_count);
        m_buffer = null;
        return (count < m_count) ? count : m_count;
    }
}
于 2009-09-28T00:52:47.287 回答
4

我将添加一个更精致的 EchoStream 版本。这是其他两个版本的组合,加上评论中的一些建议。

更新- 我已经用超过 50 TB 的数据连续几天测试了这个 EchoStream。测试让它位于网络流和 ZStandard 压缩流之间。异步也经过了测试,这带来了一种罕见的挂起状态。似乎内置 System.IO.Stream 不希望在同一流上同时调用 ReadAsync 和 WriteAsync,如果没有任何可用数据,这可能会导致它挂起,因为这两个调用使用相同的内部变量。因此我不得不重写这些函数,从而解决了挂起的问题。

此版本具有以下增强功能:

  1. 这是使用 System.IO.Stream 基类而不是 MemoryStream 从头开始​​编写的。

  2. 构造函数可以设置最大队列深度,如果达到此级别,则流写入将阻塞,直到执行读取,将队列深度降至最大级别以下(无限制=0,默认值=10)。

  3. 读取/写入数据时,缓冲区偏移量和计数现在得到尊重。此外,您可以使用比 Write 更小的缓冲区调用 Read,而不会引发异常或丢失数据。BlockCopy 在循环中用于填充字节,直到满足计数。

  4. 有一个名为 AlwaysCopyBuffer 的公共属性,它在 Write 函数中制作缓冲区的副本。将此设置为 true 将安全地允许在调用 Write 后重用字节缓冲区。

  5. 有一个名为 ReadTimeout/WriteTimeout 的公共属性,它控制 Read/Write 函数在返回 0 之前将阻塞多长时间(默认值=Infinite,-1)。

  6. 使用了 BlockingCollection<> 类,它在底层结合了 ConcurrentQueue 和 AutoResetEvent 类。最初我正在使用这两个类,但存在一种罕见的情况,您会发现在数据被 Enqueued() 之后,当 AutoResetEvent 允许 Read() 中的线程通过时,它不会立即可用。这种情况大约每通过 500GB 的数据就会发生一次。解决方法是休眠并再次检查数据。有时 Sleep(0) 有效,但在 CPU 使用率很高的极端情况下,在数据出现之前它可能高达 Sleep(1000)。在我切换到 BlockingCollection<> 之后,它有很多额外的代码可以优雅地处理这个问题并且没有问题。

  7. 这已经过测试,对于同时异步读写来说是线程安全的。

    使用系统;使用 System.IO;使用 System.Threading.Tasks;使用 System.Threading;使用 System.Collections.Concurrent;

    公共类 EchoStream :流 { 公共覆盖布尔 CanTimeout { 获取;} = 真;公共覆盖 int ReadTimeout { get; 放; } = 超时。无限;公共覆盖 int WriteTimeout { get; 放; } = 超时。无限;公共覆盖 bool CanRead { get; } = 真;公共覆盖 bool CanSeek { get; } = 假;公共覆盖 bool CanWrite { get; } = 真;

     public bool CopyBufferOnWrite { get; set; } = false;
    
     private readonly object _lock = new object();
    
     // Default underlying mechanism for BlockingCollection is ConcurrentQueue<T>, which is what we want
     private readonly BlockingCollection<byte[]> _Buffers;
     private int _maxQueueDepth = 10;
    
     private byte[] m_buffer = null;
     private int m_offset = 0;
     private int m_count = 0;
    
     private bool m_Closed = false;
     private bool m_FinalZero = false; //after the stream is closed, set to true after returning a 0 for read()
     public override void Close()
     {
         m_Closed = true;
    
         // release any waiting writes
         _Buffers.CompleteAdding();
     }
    
     public bool DataAvailable
     {
         get
         {
             return _Buffers.Count > 0;
         }
     }
    
     private long _Length = 0L;
     public override long Length
     {
         get
         {
             return _Length;
         }
     }
    
     private long _Position = 0L;
     public override long Position
     {
         get
         {
             return _Position;
         }
         set
         {
             throw new NotImplementedException();
         }
     }
    
     public EchoStream() : this(10)
     {
     }
    
     public EchoStream(int maxQueueDepth)
     {
         _maxQueueDepth = maxQueueDepth;
         _Buffers = new BlockingCollection<byte[]>(_maxQueueDepth);
     }
    
     // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
     public new Task WriteAsync(byte[] buffer, int offset, int count)
     {
         return Task.Run(() => Write(buffer, offset, count));
     }
    
     // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
     public new Task<int> ReadAsync(byte[] buffer, int offset, int count)
     {
         return Task.Run(() =>
         {
             return Read(buffer, offset, count);
         });
     }
    
     public override void Write(byte[] buffer, int offset, int count)
     {
         if (m_Closed || buffer.Length - offset < count || count <= 0)
             return;
    
         byte[] newBuffer;
         if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length)
             newBuffer = buffer;
         else
         {
             newBuffer = new byte[count];
             System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count);
         }
         if (!_Buffers.TryAdd(newBuffer, WriteTimeout))
             throw new TimeoutException("EchoStream Write() Timeout");
    
         _Length += count;
     }
    
     public override int Read(byte[] buffer, int offset, int count)
     {
         if (count == 0)
             return 0;
         lock (_lock)
         {
             if (m_count == 0 && _Buffers.Count == 0)
             {
                 if (m_Closed)
                 {
                     if (!m_FinalZero)
                     {
                         m_FinalZero = true;
                         return 0;
                     }
                     else
                     {
                         return -1;
                     }
                 }
    
                 if (_Buffers.TryTake(out m_buffer, ReadTimeout))
                 {
                     m_offset = 0;
                     m_count = m_buffer.Length;
                 }
                 else
                 {
                     if (m_Closed)
                     {
                         if (!m_FinalZero)
                         {
                             m_finalZero = true;
                             return 0;
                         }
                         else
                         {
                             return -1;
                         }
                     }
                     else
                     {
                         return 0;
                     }
                 }
             }
    
             int returnBytes = 0;
             while (count > 0)
             {
                 if (m_count == 0)
                 {
                     if (_Buffers.TryTake(out m_buffer, 0))
                     {
                         m_offset = 0;
                         m_count = m_buffer.Length;
                     }
                     else
                         break;
                 }
    
                 var bytesToCopy = (count < m_count) ? count : m_count;
                 System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy);
                 m_offset += bytesToCopy;
                 m_count -= bytesToCopy;
                 offset += bytesToCopy;
                 count -= bytesToCopy;
    
                 returnBytes += bytesToCopy;
             }
    
             _Position += returnBytes;
    
             return returnBytes;
         }
     }
    
     public override int ReadByte()
     {
         byte[] returnValue = new byte[1];
         return (Read(returnValue, 0, 1) <= 0 ? -1 : (int)returnValue[0]);
     }
    
     public override void Flush()
     {
     }
    
     public override long Seek(long offset, SeekOrigin origin)
     {
         throw new NotImplementedException();
     }
    
     public override void SetLength(long value)
     {
         throw new NotImplementedException();
     }
    

    }

于 2018-12-05T06:57:29.653 回答
3

匿名管道流像文件流一样阻塞,应该比提供的示例代码处理更多的边缘情况。

这是一个演示此行为的单元测试。

var cts = new CancellationTokenSource();
using (var pipeServer = new AnonymousPipeServerStream(PipeDirection.Out))
using (var pipeStream = new AnonymousPipeClientStream(PipeDirection.In, pipeServer.ClientSafePipeHandle))
{
    var buffer = new byte[1024];
    var readTask = pipeStream.ReadAsync(buffer, 0, buffer.Length, cts.Token);
    Assert.IsFalse(readTask.IsCompleted, "Read already complete");

    // Cancelling does NOT unblock the read
    cts.Cancel();
    Assert.IsFalse(readTask.IsCanceled, "Read cancelled");

    // Only sending data does
    pipeServer.WriteByte(42);
    var bytesRead = await readTask;
    Assert.AreEqual(1, bytesRead);
}
于 2020-08-06T17:37:48.757 回答
0

这是我对上面发布的 EchoStream 的看法。它处理写入和读取的偏移和计数参数。

public class EchoStream : MemoryStream
{
    private readonly ManualResetEvent _DataReady = new ManualResetEvent(false);
    private readonly ConcurrentQueue<byte[]> _Buffers = new ConcurrentQueue<byte[]>();

    public bool DataAvailable { get { return !_Buffers.IsEmpty; } }

    public override void Write(byte[] buffer, int offset, int count)
    {
        _Buffers.Enqueue(buffer.Skip(offset).Take(count).ToArray());
        _DataReady.Set();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        _DataReady.WaitOne();

        byte[] lBuffer;

        if (!_Buffers.TryDequeue(out lBuffer))
        {
            _DataReady.Reset();
            return -1;
        }

        if (!DataAvailable)
            _DataReady.Reset();

        Array.Copy(lBuffer, 0, buffer, offset, Math.Min(lBuffer.Length, count));
        return lBuffer.Length;
    }
}

我能够使用此类对 System.IO.Pipelines 实现进行单元测试。我需要一个可以连续模拟多个读取调用而不会到达流末尾的 MemoryStream。

于 2021-03-25T18:25:52.653 回答