1

我正在尝试将多个文件从 Silverlight 客户端直接上传到 Amazon S3。用户从标准文件打开对话框中选择文件,我想链接上传,以便它们一次连续发生一个。这可能发生在应用程序中的多个位置,因此我试图将其包装在一个不错的实用程序类中,该实用程序类接受所选文件的 IEnumerable 在上传文件时公开文件的 IObservable,以便 UI 可以相应地响应每个文件完成了。

由于 Silverlight 和 AmazonS3 的所有安全要求,它相当复杂。我将尝试简要解释我的整个环境的上下文,但我已经用一个小型控制台应用程序重现了这个问题,我将把代码发布到下面。

我有一个第 3 方实用程序,可以处理从 Silverlight 上传到 S3 的操作,该实用程序公开了基于标准事件的异步方法。我为每个上传的文件创建该实用程序的一个实例。它创建一个未签名的请求字符串,然后我将其发布到我的服务器以使用我的私钥进行签名。该签名请求通过服务代理类发生,该类也使用基于事件的异步方法。获得签名请求后,我将其添加到上传程序实例并启动上传。

我尝试过使用 Concat,但最终只有第一个文件通过该过程。当我使用 Merge 时,所有文件都可以正常完成,但以并行方式而不是串行方式完成。当我使用 Merge(2) 时,所有文件都从第一步开始,但只有 2 个文件通过并完成。

显然我错过了与 Rx 相关的一些东西,因为它的行为不像我预期的那样。

namespace RxConcat
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive.Linq;
    using System.Timers;

    public class SignCompletedEventArgs : EventArgs
    {
        public string SignedRequest { get; set; }
    }

    public class ChainUploader
    {
        public IObservable<string> StartUploading(IEnumerable<string> files)
        {
            return files.Select(
                     file => from signArgs in this.Sign(file + "_request")
                             from uploadArgs in this.Upload(file, signArgs.EventArgs.SignedRequest)
                             select file).Concat();
        }

        private IObservable<System.Reactive.EventPattern<SignCompletedEventArgs>> Sign(string request)
        {
            Console.WriteLine("Signing request '" + request + "'");
            var signer = new Signer();
            var source = Observable.FromEventPattern<SignCompletedEventArgs>(ev => signer.SignCompleted += ev, ev => signer.SignCompleted -= ev);
            signer.SignAsync(request);
            return source;
        }

        private IObservable<System.Reactive.EventPattern<EventArgs>> Upload(string file, string signedRequest)
        {
            Console.WriteLine("Uploading file '" + file + "'");
            var uploader = new Uploader();
            var source = Observable.FromEventPattern<EventArgs>(ev => uploader.UploadCompleted += ev, ev => uploader.UploadCompleted -= ev);
            uploader.UploadAsync(file, signedRequest);
            return source;
        }
    }

    public class Signer
    {
        public event EventHandler<SignCompletedEventArgs> SignCompleted;

        public void SignAsync(string request)
        {
            var timer = new Timer(1000);
            timer.Elapsed += (sender, args) =>
            {
                timer.Stop();
                if (this.SignCompleted == null)
                {
                    return;
                }

                this.SignCompleted(this, new SignCompletedEventArgs { SignedRequest = request + "signed" });
            };
            timer.Start();
        }
    }

    public class Uploader
    {
        public event EventHandler<EventArgs> UploadCompleted;

        public void UploadAsync(string file, string signedRequest)
        {
            var timer = new Timer(1000);
            timer.Elapsed += (sender, args) =>
            {
                timer.Stop();
                if (this.UploadCompleted == null)
                {
                    return;
                }

                this.UploadCompleted(this, new EventArgs());
            };
            timer.Start();
        }
    }

    internal class Program
    {
        private static void Main(string[] args)
        {
            var files = new[] { "foo", "bar", "baz" };
            var uploader = new ChainUploader();
            var token = uploader.StartUploading(files).Subscribe(file =>   Console.WriteLine("Upload completed for '" + file + "'"));
            Console.ReadLine();
        }
    }
}
4

1 回答 1

1

为每个文件处理两步上传的基本 observable 永远不会“完成”,这会阻止链中的下一个文件启动。在调用 Concat() 之前向该 observable 添加一个 Limit(1) ,它将正常工作。

return files.Select(file => (from signArgs in this.Sign(file + "_request")
                             from uploadArgs in this.Upload(file, signArgs.EventArgs.SignedRequest)
                             select file).Take(1)).Concat();
于 2012-06-08T18:20:39.450 回答