24

我有一个System.IO.Stream不可搜索的只读实现(并且它Position总是返回 0)。我需要将它发送给Seek在流上执行一些操作(也就是设置位置)的消费者。这不是一个巨大的搜索——比如从当前位置开始+/- 100。是否有现有的Stream包装器可以为流添加缓冲能力以进行简单的 Seek 操作?

更新:我应该补充一点,我的消费者是 NAudio Mp3FileReader。我真的只需要一种播放(缓慢且无限期地)流式 MP3 的方法。我认为这是一个错误,NAudio 期望能够随意寻找他们的数据源。

4

6 回答 6

21

向前寻找很容易(只需阅读),但没有缓冲就无法向后寻找。也许只是:

using(var ms = new MemoryStream()) {
    otherStream.CopyTo(ms);
    ms.Position = 0;
    // now work with ms
}

但是,这仅适用于已知结束(不需要执行哪些流)的中小型流(不是 GB)。如果您需要更大的流,FileStream则可以使用临时文件,但 IO 密集度要高得多。

于 2012-10-23T17:27:17.837 回答
19

这是一个包装器,可以使任何Stream可搜索的读取操作。

它通过缓存来自底层流的读取来工作,最多缓存构造函数中指定的字节数。当内存限制禁止 Marc Gravell 的解决方案时,这将派上用场。

支持的查找操作:

  • 寻求前向使用SeekOrigin.CurrentSeekOrigin.Begin适用于任意偏移量
  • 使用SeekOrigin.Current并从底层流中的当前位置SeekOrigin.Begin向下搜索-seekBackBufferSize字节(这可能与readSeekableStream.Position先前的向后搜索后不同)
  • 寻求使用SeekOrigin.End作品offset >= -seekBackBufferSize && offset <= 0

一般说明

  • Seek方法和属性完全在内部Position处理,不涉及底层流(无论如何只会抛出)
  • 搜索只影响流的读取部分,因此类的名称
  • 所有写操作都简单地委托给底层流
  • 用 this 包装已经可搜索的流将浪费资源
  • 下面解决的一些问题ReadSeekableStream也可以由我的PeekableStream班级解决

这个实现是新鲜的,还没有经过实战。然而,我已经对很多搜索/阅读案例和极端案例进行了单元测试,并将其与 (freely seekable) 进行了交叉比较MemoryStream

public class ReadSeekableStream : Stream
{
    private long _underlyingPosition;
    private readonly byte[] _seekBackBuffer;
    private int _seekBackBufferCount;
    private int _seekBackBufferIndex;
    private readonly Stream _underlyingStream;

    public ReadSeekableStream(Stream underlyingStream, int seekBackBufferSize)
    {
        if (!underlyingStream.CanRead)
            throw new Exception("Provided stream " + underlyingStream + " is not readable");
        _underlyingStream = underlyingStream;
        _seekBackBuffer = new byte[seekBackBufferSize];
    }

    public override bool CanRead { get { return true; } }
    public override bool CanSeek { get { return true; } }

    public override int Read(byte[] buffer, int offset, int count)
    {
        int copiedFromBackBufferCount = 0;
        if (_seekBackBufferIndex < _seekBackBufferCount)
        {
            copiedFromBackBufferCount = Math.Min(count, _seekBackBufferCount - _seekBackBufferIndex);
            Buffer.BlockCopy(_seekBackBuffer, _seekBackBufferIndex, buffer, offset, copiedFromBackBufferCount);
            offset += copiedFromBackBufferCount;
            count -= copiedFromBackBufferCount;
            _seekBackBufferIndex += copiedFromBackBufferCount;
        }
        int bytesReadFromUnderlying = 0;
        if (count > 0)
        {
            bytesReadFromUnderlying = _underlyingStream.Read(buffer, offset, count);
            if (bytesReadFromUnderlying > 0)
            {
                _underlyingPosition += bytesReadFromUnderlying;

                var copyToBufferCount = Math.Min(bytesReadFromUnderlying, _seekBackBuffer.Length);
                var copyToBufferOffset = Math.Min(_seekBackBufferCount, _seekBackBuffer.Length - copyToBufferCount);
                var bufferBytesToMove = Math.Min(_seekBackBufferCount - 1, copyToBufferOffset);

                if (bufferBytesToMove > 0)
                    Buffer.BlockCopy(_seekBackBuffer, _seekBackBufferCount - bufferBytesToMove, _seekBackBuffer, 0, bufferBytesToMove);
                Buffer.BlockCopy(buffer, offset, _seekBackBuffer, copyToBufferOffset, copyToBufferCount);
                _seekBackBufferCount = Math.Min(_seekBackBuffer.Length, _seekBackBufferCount + copyToBufferCount);
                _seekBackBufferIndex = _seekBackBufferCount;
            }
        }
        return copiedFromBackBufferCount + bytesReadFromUnderlying;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        if (origin == SeekOrigin.End) 
            return SeekFromEnd((int) Math.Max(0, -offset));

        var relativeOffset = origin == SeekOrigin.Current
            ? offset
            : offset - Position;

        if (relativeOffset == 0)
            return Position;
        else if (relativeOffset > 0)
            return SeekForward(relativeOffset);
        else
            return SeekBackwards(-relativeOffset);
    }

    private long SeekForward(long origOffset)
    {
        long offset = origOffset;
        var seekBackBufferLength = _seekBackBuffer.Length;

        int backwardSoughtBytes = _seekBackBufferCount - _seekBackBufferIndex;
        int seekForwardInBackBuffer = (int) Math.Min(offset, backwardSoughtBytes);
        offset -= seekForwardInBackBuffer;
        _seekBackBufferIndex += seekForwardInBackBuffer;

        if (offset > 0)
        {
            // first completely fill seekBackBuffer to remove special cases from while loop below
            if (_seekBackBufferCount < seekBackBufferLength)
            {
                var maxRead = seekBackBufferLength - _seekBackBufferCount;
                if (offset < maxRead)
                    maxRead = (int) offset;
                var bytesRead = _underlyingStream.Read(_seekBackBuffer, _seekBackBufferCount, maxRead);
                _underlyingPosition += bytesRead;
                _seekBackBufferCount += bytesRead;
                _seekBackBufferIndex = _seekBackBufferCount;
                if (bytesRead < maxRead)
                {
                    if (_seekBackBufferCount < offset)
                        throw new NotSupportedException("Reached end of stream seeking forward " + origOffset + " bytes");
                    return Position;
                }
                offset -= bytesRead;
            }

            // now alternate between filling tempBuffer and seekBackBuffer
            bool fillTempBuffer = true;
            var tempBuffer = new byte[seekBackBufferLength];
            while (offset > 0)
            {
                var maxRead = offset < seekBackBufferLength ? (int) offset : seekBackBufferLength;
                var bytesRead = _underlyingStream.Read(fillTempBuffer ? tempBuffer : _seekBackBuffer, 0, maxRead);
                _underlyingPosition += bytesRead;
                var bytesReadDiff = maxRead - bytesRead;
                offset -= bytesRead;
                if (bytesReadDiff > 0 /* reached end-of-stream */ || offset == 0) 
                {
                    if (fillTempBuffer)
                    {
                        if (bytesRead > 0)
                        {
                            Buffer.BlockCopy(_seekBackBuffer, bytesRead, _seekBackBuffer, 0, bytesReadDiff);
                            Buffer.BlockCopy(tempBuffer, 0, _seekBackBuffer, bytesReadDiff, bytesRead);
                        }
                    }
                    else
                    {
                        if (bytesRead > 0)
                            Buffer.BlockCopy(_seekBackBuffer, 0, _seekBackBuffer, bytesReadDiff, bytesRead);
                        Buffer.BlockCopy(tempBuffer, bytesRead, _seekBackBuffer, 0, bytesReadDiff);
                    }
                    if (offset > 0)
                        throw new NotSupportedException("Reached end of stream seeking forward " + origOffset + " bytes");
                }
                fillTempBuffer = !fillTempBuffer;
            }
        }
        return Position;
    }

    private long SeekBackwards(long offset)
    {
        var intOffset = (int)offset;
        if (offset > int.MaxValue || intOffset > _seekBackBufferIndex)
            throw new NotSupportedException("Cannot currently seek backwards more than " + _seekBackBufferIndex + " bytes");
        _seekBackBufferIndex -= intOffset;
        return Position;
    }

    private long SeekFromEnd(long offset)
    {
        var intOffset = (int) offset;
        var seekBackBufferLength = _seekBackBuffer.Length;
        if (offset > int.MaxValue || intOffset > seekBackBufferLength)
            throw new NotSupportedException("Cannot seek backwards from end more than " + seekBackBufferLength + " bytes");

        // first completely fill seekBackBuffer to remove special cases from while loop below
        if (_seekBackBufferCount < seekBackBufferLength)
        {
            var maxRead = seekBackBufferLength - _seekBackBufferCount;
            var bytesRead = _underlyingStream.Read(_seekBackBuffer, _seekBackBufferCount, maxRead);
            _underlyingPosition += bytesRead;
            _seekBackBufferCount += bytesRead;
            _seekBackBufferIndex = Math.Max(0, _seekBackBufferCount - intOffset);
            if (bytesRead < maxRead)
            {
                if (_seekBackBufferCount < intOffset)
                    throw new NotSupportedException("Could not seek backwards from end " + intOffset + " bytes");
                return Position;
            }
        }
        else
        {
            _seekBackBufferIndex = _seekBackBufferCount;
        }

        // now alternate between filling tempBuffer and seekBackBuffer
        bool fillTempBuffer = true;
        var tempBuffer = new byte[seekBackBufferLength];
        while (true)
        {
            var bytesRead = _underlyingStream.Read(fillTempBuffer ? tempBuffer : _seekBackBuffer, 0, seekBackBufferLength);
            _underlyingPosition += bytesRead;
            var bytesReadDiff = seekBackBufferLength - bytesRead;
            if (bytesReadDiff > 0) // reached end-of-stream
            {
                if (fillTempBuffer)
                {
                    if (bytesRead > 0)
                    {
                        Buffer.BlockCopy(_seekBackBuffer, bytesRead, _seekBackBuffer, 0, bytesReadDiff);
                        Buffer.BlockCopy(tempBuffer, 0, _seekBackBuffer, bytesReadDiff, bytesRead);
                    }
                }
                else
                {
                    if (bytesRead > 0)
                        Buffer.BlockCopy(_seekBackBuffer, 0, _seekBackBuffer, bytesReadDiff, bytesRead);
                    Buffer.BlockCopy(tempBuffer, bytesRead, _seekBackBuffer, 0, bytesReadDiff);
                }
                _seekBackBufferIndex -= intOffset;
                return Position;
            }
            fillTempBuffer = !fillTempBuffer;
        }
    }

    public override long Position
    {
        get { return _underlyingPosition - (_seekBackBufferCount - _seekBackBufferIndex); }
        set { Seek(value, SeekOrigin.Begin); }
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
            _underlyingStream.Close();
        base.Dispose(disposing);
    }

    public override bool CanTimeout { get { return _underlyingStream.CanTimeout; } }
    public override bool CanWrite { get { return _underlyingStream.CanWrite; } }
    public override long Length { get { return _underlyingStream.Length; } }
    public override void SetLength(long value) { _underlyingStream.SetLength(value); }
    public override void Write(byte[] buffer, int offset, int count) { _underlyingStream.Write(buffer, offset, count); }
    public override void Flush() { _underlyingStream.Flush(); }
}
于 2015-01-20T01:29:53.550 回答
1

另一种解决方案可能是创建自己的流类来包装另一个流。将 Seek 实施为 NOP。

class MyStream : Stream
{
    public MyStream(Stream baseStream) { this.baseStream = baseStream; }
    private Stream baseStream;

    // Delegate all operations except Seek/CanSeek to baseStream

    public override bool CanSeek { get { return true; } }
    public override long Seek(long offset, SeekOrigin origin) { return baseStream.Position; }
}

如果玩家没有充分的理由寻求,这可能会奏效。

于 2012-10-23T19:23:03.793 回答
1

我只是使用 Amazon SDK 中的MakeStreamSeekable方法:


MakeStreamSeekable 方法(输入)

将不可搜索的流转换为 System.IO.MemoryStream。MemoryStream 的位置可以任意移动。

声明语法

C#

public static Stream MakeStreamSeekable(
    Stream input
)

参数

*input* ([Stream][2]) 要转换的流

返回值

可搜索的 MemoryStream

评论

MemoryStreams 使用字节数组作为它们的后备存储。请谨慎使用,因为一个非常大的流很可能会导致系统资源被用完。

于 2018-11-27T15:12:47.610 回答
1

Microsoft BizTalk 实现了一个可搜索的流。这个概念是它是一个具有输入流和缓冲流的流。输入流是不可搜索的流,而缓冲流是您想要使用的任何可以充当缓冲区的流类型。因此,任何时候您提前搜索或读取它都会将数据从输入流复制到缓冲区。每当您回溯并阅读您已经阅读过的内容时,它都会从缓冲区中读取。

知道输入流是否可搜索并在可以时跳过使用缓冲区是足够聪明的。

有很多地方可以找到代码。其中之一在这里,我已经包括在下面。该代码确实依赖于缓冲区流的 VirtualStream,它将使用内存,直到缓冲区太大然后使用磁盘。您也可以获取该代码,或者轻松删除依赖项。

//---------------------------------------------------------------------
// File: SeekableReadOnlyStream.cs
// 
// Summary: A sample pipeline component which demonstrates how to promote message context
//          properties and write distinguished fields for XML messages using arbitrary
//          XPath expressions.
//
// Sample: Arbitrary XPath Property Handler Pipeline Component SDK 
//
//---------------------------------------------------------------------
// This file is part of the Microsoft BizTalk Server 2006 SDK
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// This source code is intended only as a supplement to Microsoft BizTalk
// Server 2006 release and/or on-line documentation. See these other
// materials for detailed information regarding Microsoft code samples.
//
// THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
// KIND, WHETHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
// PURPOSE.
//---------------------------------------------------------------------

using System;
using System.IO;
using System.Diagnostics;

namespace Microsoft.Samples.BizTalk.Adapter.Tcp
{
    /// <summary>
    /// Implements a seekable read-only stream which uses buffering if
    /// underlying stream is not seekable. Buffer in memory has size
    /// threshold and overflows to disk (temporary file) if number of bytes.
    /// </summary>
    public class SeekableReadOnlyStream : Stream
    {
        /// <summary>
        /// Initializes a SeekableReadOnlyStream instance with base stream and 
        /// buffering stream.
        /// </summary>
        /// <param name="baseStream">Base stream</param>
        /// <param name="overflowStream">Buffering stream</param>
        public SeekableReadOnlyStream(Stream baseStream, Stream bufferingStream)
        {
            if (null == baseStream)
                throw new ArgumentNullException("baseStream");
            if (null == bufferingStream)
                throw new ArgumentNullException("bufferingStream");
            
            // Sanity check - make sure that buffering stream is seekable
            if (!bufferingStream.CanSeek)
                throw new NotSupportedException("Buffering stream must be seekable");

            this.baseStream = baseStream;
            this.bufferingStream = bufferingStream;
        }

        /// <summary>
        /// Initializes a SeekableReadOnlyStream instance with base stream and inherently uses
        /// VirtualStream instance as buffering stream.
        /// </summary>
        /// <param name="baseStream">Base stream</param>
        public SeekableReadOnlyStream(Stream baseStream) : this(baseStream, new VirtualStream())
        {
            // Empty
        }

        /// <summary>
        /// Initializes a SeekableReadOnlyStream instance with base stream and buffer size, and 
        /// inherently uses VirtualStream instance as buffering stream.
        /// </summary>
        /// <param name="baseStream">Base stream</param>
        /// <param name="bufferSize">Buffer size</param>
        public SeekableReadOnlyStream(Stream baseStream, int bufferSize) : this(baseStream, new VirtualStream(bufferSize))
        {
            // Empty
        }

        /// <summary>
        /// Gets a flag indicating whether this stream can be read.
        /// </summary>
        public override bool CanRead
        {
            get { return true; }
        }

        /// <summary>
        /// Gets a flag indicating whether this stream can be written to.
        /// </summary>
        public override bool CanWrite
        {
            get { return false; }
        }

        public override bool CanSeek
        {
            get { return true; }
        }

        /// <summary>
        /// Gets or sets a stream position.
        /// </summary>
        public override long Position
        {
            get 
            { 
                // Check if base stream is seekable
                if (baseStream.CanSeek)
                    return baseStream.Position;

                return bufferingStream.Position; 
            }
            set
            {
                // Check if base stream is seekable
                if (baseStream.CanSeek)
                {
                    baseStream.Position = value;
                    return;
                }

                // Check if current position is the same as being set
                if (bufferingStream.Position == value)
                    return;

                // Check if stream position is being set to the value which is in already
                // read to the buffering stream space, i.e. less than current buffering stream
                // position or less than length of the buffering stream
                if (value < bufferingStream.Position || value < bufferingStream.Length)
                {
                    // Just change position in the buffering stream
                    bufferingStream.Position = value;
                }
                else
                {
                    //
                    // Need to read buffer from the base stream from the current position in 
                    // base stream to the position being set and write that buffer to the end
                    // of the buffering stream
                    //

                    // Set position to the last byte in the buffering stream
                    bufferingStream.Seek(0, SeekOrigin.End);

                    // Read buffer from the base stream and write it to the buffering stream
                    // in 4K chunks
                    byte [] buffer = new byte[ 4096 ];
                    long bytesToRead = value - bufferingStream.Position;
                    while (bytesToRead > 0)
                    {
                        // Read to buffer 4K or byteToRead, whichever is less
                        int bytesRead = baseStream.Read(buffer, 0, (int) Math.Min(bytesToRead, buffer.Length));

                        // Check if any bytes were read
                        if (0 == bytesRead)
                            break;
                        
                        // Write read bytes to the buffering stream
                        bufferingStream.Write(buffer, 0, bytesRead);

                        // Decrease bytes to read counter
                        bytesToRead -= bytesRead;
                    }

                    //
                    // Since this stream is not writable, any attempt to point Position beyond the length
                    // of the base stream will not succeed, and buffering stream position will be set to the
                    // last byte in the buffering stream.
                    //
                }
            }
        }

        /// <summary>
        /// Seeks in stream. For this stream can be very expensive because entire base stream 
        /// can be dumped into buffering stream if SeekOrigin.End is used.
        /// </summary>
        /// <param name="offset">A byte offset relative to the origin parameter</param>
        /// <param name="origin">A value of type SeekOrigin indicating the reference point used to obtain the new position</param>
        /// <returns>The new position within the current stream</returns>
        public override long Seek(long offset, SeekOrigin origin)
        {
            // Check if base stream is seekable
            if (baseStream.CanSeek)
                return baseStream.Seek(offset, origin);

            if (SeekOrigin.Begin == origin)
            {
                // Just set the absolute position using Position property
                Position = offset;
                return Position;
            }

            if (SeekOrigin.Current == origin)
            {
                // Set the position using current Position property value plus offset
                Position = Position + offset;
                return Position;
            }
            
            if (SeekOrigin.End == origin)
            {
                //
                // Need to read all remaining not read bytes from the base stream to the 
                // buffering stream. We can't use offset here because stream size may not
                // be available because it's not seekable. Then we'll set the position 
                // based on buffering stream size.
                //

                // Set position to the last byte in the buffering stream
                bufferingStream.Seek(0, SeekOrigin.End);

                // Read all remaining bytes from the base stream to the buffering stream
                byte [] buffer = new byte[ 4096 ];
                for (;;)
                {
                    // Read buffer from base stream
                    int bytesRead = baseStream.Read(buffer, 0, buffer.Length);
                    
                    // Break the reading loop if the base stream is exhausted
                    if (0 == bytesRead)
                        break;

                    // Write buffer to the buffering stream
                    bufferingStream.Write(buffer, 0, bytesRead);
                }

                // Now buffering stream size is equal to the base stream size. Set position
                // using begin origin
                Position = bufferingStream.Length - offset;
                return Position;
            }

            throw new NotSupportedException("Not supported SeekOrigin");
        }

        /// <summary>
        /// Gets the length in bytes of the stream. For this stream can be very expensive
        /// because entire base stream will be dumped into buffering stream.
        /// </summary>
        public override long Length
        {
            get
            {
                // Check if base stream is seekable
                if (baseStream.CanSeek)
                    return baseStream.Length;

                // Preserve the current stream position
                long position = Position;

                // Seek to the end of stream
                Seek(0, SeekOrigin.End);

                // Length will be equal to the current position
                long length = Position;

                // Restore the current stream position
                Position = position;

                return length;
            }
        }

        /// <summary>
        /// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
        /// </summary>
        /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and (offset + count- 1) replaced by the bytes read from the current source</param>
        /// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data read from the current stream</param>
        /// <param name="count">The maximum number of bytes to be read from the current stream</param>
        /// <returns>The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached</returns>
        public override int Read(byte[] buffer, int offset, int count)
        {
            // Check if base stream is seekable
            if (baseStream.CanSeek)
                return baseStream.Read(buffer, offset, count);

            int bytesReadTotal = 0;

            // Check if buffering stream has some bytes to read, starting from the
            // current position
            if (bufferingStream.Length > bufferingStream.Position)
            {
                // Read available bytes in buffering stream or count bytes to the buffer, whichever is less
                bytesReadTotal = bufferingStream.Read(buffer, offset, (int) Math.Min(bufferingStream.Length - bufferingStream.Position, count));
                
                // Account for bytes read from the buffering stream
                count -= bytesReadTotal;
                offset += bytesReadTotal;
            }

            // Check if we have any more bytes to read
            if (count > 0)
            {
                Debug.Assert(bufferingStream.Position == bufferingStream.Length);

                //
                // At this point, buffering stream has position set to its end. We need to read buffer from
                // the base stream and write it to the buffering stream
                //

                // Read count bytes from the base stream starting from offset
                int bytesRead = baseStream.Read(buffer, offset, count);

                // Check if bytes were really read
                if (bytesRead > 0)
                {
                    // Write number of read bytes to the buffering stream starting from offset in buffer
                    bufferingStream.Write(buffer, offset, bytesRead);
                }

                // Add number of bytes read at this step to the number of totally read bytes
                bytesReadTotal += bytesRead;
            }

            return bytesReadTotal;
        }

        /// <summary>
        /// Writes to stream.
        /// </summary>
        /// <param name="buffer">Buffer to write to stream</param>
        /// <param name="offset">Stream offset to start write from</param>
        /// <param name="count">Number of bytes from buffer to write</param>
        /// <exception cref="NotSupportedException">Is thrown always</exception>
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        /// <summary>
        /// Set stream length.
        /// </summary>
        /// <param name="value">Stream length</param>
        /// <exception cref="NotSupportedException">Is thrown always</exception>
        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        /// <summary>
        /// Closes base and buffering streams.
        /// </summary>
        public override void Close()
        {
            // Close underlying streams
            baseStream.Close();
            bufferingStream.Close();
        }

        /// <summary>
        /// Flushes the stream.
        /// </summary>
        public override void Flush()
        {
            // Flush the buffering stream
            bufferingStream.Flush();
        }


        private Stream baseStream;
        private Stream bufferingStream;
    }
}
于 2021-04-30T18:19:37.743 回答
0

如果使用System.Net.WebClient, 而不是 using OpenRead()which 返回Stream使用webClient.DownloadData("https://your.url")来获取字节数组,然后您可以将其转换为MemoryStream. 这是一个例子:

byte[] buffer = client.DownloadData(testBlobFile);
using (var stream = new MemoryStream(buffer))
{
    ... your code using the stream ...
}

显然,这Stream会在创建之前下载所有内容,因此可能会破坏使用Stream.

于 2019-01-07T23:57:25.933 回答