2

在我之前的问题中,有人评论说可以同时将一个 .NET 流写入另一个流。我只能找到一种方法,但使用一个临时变量(缓冲区)来存储输入流的内容(一次全部或部分)。

  1. 有什么方法可以在不使用缓冲区的情况下做到这一点?
  2. 有什么办法可以同时进行吗?
4

2 回答 2

5

假设流支持正确的异步访问,使用缓冲区可以异步写入两个流。这实际上是并发的 - 尽管您将有一项棘手的工作来保持所有缓冲区同步,因为一个人可以在另一个缓冲区完成写入之前很长时间完成写入第一个缓冲区。

据我所知,如果不使用至少一个字节的缓冲区,就无法从一个(一般)流复制到另一个流。(当然,只使用一个字节会非常低效。)

于 2009-02-26T19:38:26.803 回答
4

如果您乐于将并发性限制为最慢流的速率,那么这非常简单并且不需要额外的缓冲区,我是从内存中编写的,所以可能会有点混乱。

它也没有实现所有需要的覆盖,为了简单起见,CanRead、CanSeek 等都被跳过了,只是完成了核心并发写入。

using System;
using System.IO;
using System.Threading;

namespace MultiCastStream
{   
  public class MultiWriteStream : Stream
  {
    private readonly Stream[] streams;      
    private AutoResetEvent[] waits;
    private readonly IAsyncResult[] results;

    public MultiWriteStream(params Stream[] streams)
    {
      this.streams = (Stream[])streams.Clone();
      this.results = new IAsyncResult[this.streams.Length];
    }

    private void prepWaits()
    {
      if (waits == null)
      {
        this.waits = new AutoResetEvent[this.streams.Length];
        for(int i= 0; i < this.waits.Length; i++)
        {
          this.waits[i] = new AutoResetEvent(false);
        }
      }
      else
      {
        for(int i= 0; i < this.waits.Length; i++)
        {
          this.waits[i].Reset();
        }
      }
    }

    public override void Write (byte[] buffer, int offset, int count)
    {
      prepWaits();
      for( int i = 0; i < this.streams.Length; i++)
      {
        this.results[i] = streams[i].BeginWrite(
          buffer, offset, count, this.Release, waits[i]);   
      }
      WaitHandle.WaitAll(waits);
      for( int i = 0; i < this.streams.Length; i++)
      {
        this.results[i] = this.streams[i].EndWrite(results[i]); 
      }
    }

    private void Release(IAsyncResult result)
    {
      ((AutoResetEvent)result.AsyncState).Set();
    }

    public override void WriteByte (byte value)
    {
      // no point doing this asynchronously
      foreach (Stream s in this.streams) 
      s.WriteByte (value);
    }

    protected override void Dispose (bool disposing)
    {
      base.Dispose (disposing);
      if (this.waits != null)
      {
        foreach (AutoResetEvent w in this.waits)
          w.Close();
      }
      foreach (Stream s in this.streams)
        s.Dispose();
    }       
  }
}
于 2009-02-26T20:56:19.113 回答