我正在尝试将多个文件从 Silverlight 客户端直接上传到 Amazon S3。用户从标准文件打开对话框中选择文件,我想链接上传,以便它们一次连续发生一个。这可能发生在应用程序中的多个位置,因此我试图将其包装在一个不错的实用程序类中,该实用程序类接受所选文件的 IEnumerable 在上传文件时公开文件的 IObservable,以便 UI 可以相应地响应每个文件完成了。
由于 Silverlight 和 AmazonS3 的所有安全要求,它相当复杂。我将尝试简要解释我的整个环境的上下文,但我已经用一个小型控制台应用程序重现了这个问题,我将把代码发布到下面。
我有一个第 3 方实用程序,可以处理从 Silverlight 上传到 S3 的操作,该实用程序公开了基于标准事件的异步方法。我为每个上传的文件创建该实用程序的一个实例。它创建一个未签名的请求字符串,然后我将其发布到我的服务器以使用我的私钥进行签名。该签名请求通过服务代理类发生,该类也使用基于事件的异步方法。获得签名请求后,我将其添加到上传程序实例并启动上传。
我尝试过使用 Concat,但最终只有第一个文件通过该过程。当我使用 Merge 时,所有文件都可以正常完成,但以并行方式而不是串行方式完成。当我使用 Merge(2) 时,所有文件都从第一步开始,但只有 2 个文件通过并完成。
显然我错过了与 Rx 相关的一些东西,因为它的行为不像我预期的那样。
namespace RxConcat
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Timers;
public class SignCompletedEventArgs : EventArgs
{
public string SignedRequest { get; set; }
}
public class ChainUploader
{
public IObservable<string> StartUploading(IEnumerable<string> files)
{
return files.Select(
file => from signArgs in this.Sign(file + "_request")
from uploadArgs in this.Upload(file, signArgs.EventArgs.SignedRequest)
select file).Concat();
}
private IObservable<System.Reactive.EventPattern<SignCompletedEventArgs>> Sign(string request)
{
Console.WriteLine("Signing request '" + request + "'");
var signer = new Signer();
var source = Observable.FromEventPattern<SignCompletedEventArgs>(ev => signer.SignCompleted += ev, ev => signer.SignCompleted -= ev);
signer.SignAsync(request);
return source;
}
private IObservable<System.Reactive.EventPattern<EventArgs>> Upload(string file, string signedRequest)
{
Console.WriteLine("Uploading file '" + file + "'");
var uploader = new Uploader();
var source = Observable.FromEventPattern<EventArgs>(ev => uploader.UploadCompleted += ev, ev => uploader.UploadCompleted -= ev);
uploader.UploadAsync(file, signedRequest);
return source;
}
}
public class Signer
{
public event EventHandler<SignCompletedEventArgs> SignCompleted;
public void SignAsync(string request)
{
var timer = new Timer(1000);
timer.Elapsed += (sender, args) =>
{
timer.Stop();
if (this.SignCompleted == null)
{
return;
}
this.SignCompleted(this, new SignCompletedEventArgs { SignedRequest = request + "signed" });
};
timer.Start();
}
}
public class Uploader
{
public event EventHandler<EventArgs> UploadCompleted;
public void UploadAsync(string file, string signedRequest)
{
var timer = new Timer(1000);
timer.Elapsed += (sender, args) =>
{
timer.Stop();
if (this.UploadCompleted == null)
{
return;
}
this.UploadCompleted(this, new EventArgs());
};
timer.Start();
}
}
internal class Program
{
private static void Main(string[] args)
{
var files = new[] { "foo", "bar", "baz" };
var uploader = new ChainUploader();
var token = uploader.StartUploading(files).Subscribe(file => Console.WriteLine("Upload completed for '" + file + "'"));
Console.ReadLine();
}
}
}