这是RX方式。这个扩展将把一个 uri 的流转换成一个流:
public static IObservable<Stream> RequestToStream(this IObservable<string> source,
TimeSpan timeout)
{
return
from wc in source.Select(WebRequest.Create)
from s in Observable
.FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
wc.EndGetResponse)()
.Timeout(timeout, Observable.Empty<WebResponse>())
.Catch(Observable.Empty<WebResponse>())
select s.GetResponseStream();
}
用法:
new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Do(stream = > ProcessStream(stream))
.Subscribe();
编辑:哎呀,没有注意到文件写入序列化要求。这部分可以通过使用 .Concat 来完成,它本质上是一个 RX 队列(另一个是 .Zip)
让我们有一个 .StreamToFile 扩展名:
public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
{
return Observable.Defer(() =>
source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
}
现在您可以并行处理 Web 请求,但可以序列化来自它们的文件写入:
new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
.Select(x => x.StreamToFile())
.Concat()
.Subscribe();