3

鉴于在 C#、TPL、Parallel Extensions、Async CTP、Reactive Extensions 中执行异步操作的许多新方法,我想知道并行化以下获取和处理部分的最简单方法是什么:

foreach(string url in urls)
{
   var file = FetchFile(url);
   ProcessFile(file);
}

附带条件是,虽然可以随时获取ProcessFile文件,但一次只能处理一个文件,并且应该按顺序调用。

简而言之,以流水线方式获取FetchFile和运行的最简单方法是什么ProcessFile,即同时发生?

4

4 回答 4

1

鉴于ProcessFile我会说您应该使用 TPL 异步获取数据,然后将引用预加载数据的令牌排入队列。然后,您可以拥有一个后台线程,该线程将项目从队列中拉出并将它们一一交给 ProcessFile。这是一种生产者/消费者模式。

对于队列,您可以查看BlockingCollection,它可以提供线程安全队列,它还具有能够限制工作负载的良好效果。

于 2011-03-28T14:55:41.867 回答
1

由于我不知道所有花哨的机制,我可能会以老式的方式来做,尽管我怀疑它会被归类为“简单”:

var q = new Queue<MyFile>();
var ev = new ManualResetEvent(false);

new System.Threading.Thread(() =>
{
    while ( true )
    {
        ev.WaitOne();
        MyFile item;
        lock (q)
        {
            item = q.Dequeue();
            if ( q.Count == 0 )
                ev.Reset();
        }
        if ( item == null )
            break;
        ProcessFile(item);
    }
}).Start();
foreach(string url in urls)
{
    var file = FetchFile(url);
    lock (q)
    {
        q.Enqueue(file);
        ev.Set();
    }
}
lock (q)
{
    q.Enqueue(null);
    ev.Set();
}
于 2011-03-28T15:01:50.823 回答
1

这是RX方式。这个扩展将把一个 uri 的流转换成一个流:

    public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
    TimeSpan timeout)
    {
        return
            from wc in source.Select(WebRequest.Create)
            from s in Observable
                .FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
                    wc.EndGetResponse)()
                .Timeout(timeout, Observable.Empty<WebResponse>())
                .Catch(Observable.Empty<WebResponse>())
            select s.GetResponseStream();
    }

用法:

new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
   .ToObservable()
   .RequestToStream(TimeSpan.FromSeconds(5))
   .Do(stream = > ProcessStream(stream))
   .Subscribe();

编辑:哎呀,没有注意到文件写入序列化要求。这部分可以通过使用 .Concat 来完成,它本质上是一个 RX 队列(另一个是 .Zip)

让我们有一个 .StreamToFile 扩展名:

    public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
    {
        return Observable.Defer(() =>
            source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
    }

现在您可以并行处理 Web 请求,但可以序列化来自它们的文件写入:

        new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
            .ToObservable()
            .RequestToStream(TimeSpan.FromSeconds(5))
            .Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
            .Select(x => x.StreamToFile())
            .Concat()
            .Subscribe();
于 2011-03-28T17:03:51.997 回答
1

异步实际上并不表示并行。它只是意味着您不会阻止等待另一个操作。但是您可以利用异步 I/O 在下载 URL 时不阻塞线程,也就是说,如果您这样做,则不需要与 url 一样多的线程来并行下载它们:

var client = new WebClient();
var syncLock = new object();
TaskEx.WhenAll(urls.Select(url => {
  client.DownloadDataTaskAsync(url).ContinueWith((t) => {
    lock(syncLock) {
      ProcessFile(t.Result);
    }
  });
}));

基本上,我们为每个 url 创建一个异步下载任务,然后当任何任务完成时,我们调用一个使用普通对象作为 out synclock 的延续,以确保ProcessFile顺序发生。在最后一次继续完成WhenAll之前不会返回。ProcessFile

您可以避免使用 RX 的显式锁定ReplaySubject(但当然它会在内部锁定):

var pipeline = new ReplaySubject<byte[]>();
var files = pipeline.ToEnumerable();
var client = new WebClient();
TaskEx.WhenAll(urls
        .Select(download => client.DownloadDataTaskAsync((string) download)
            .ContinueWith(t => pipeline.OnNext(t.Result))
        )
    ).ContinueWith(task => pipeline.OnCompleted(task));
foreach(var file in files) {
    ProcessFile(file);
}

在这里,我们使用 aReplaySubject作为我们的文件下载管道。每次下载异步完成并将其结果发布到foreach块所在的管道中(即顺序发生)。当所有任务完成后,我们完成了 observable,它退出了foreach.

于 2011-05-31T15:45:17.303 回答