这是一种TPL 数据流方法。ABufferBlock<FtpClient>
用作FtpClient
对象池。递归枚举采用IEnumerable<string>
保存一个文件路径段的类型参数。在构建本地和远程文件路径时,这些段的组合方式不同。作为调用递归枚举的副作用,远程文件的路径被发送到一个ActionBlock<IEnumerable<string>>
. 该块处理文件的并行下载。它的Completion
属性最终包含了整个操作过程中可能发生的所有异常。
public static Task FtpDownloadDeep(string ftpHost, string ftpRoot,
string targetDirectory, string username = null, string password = null,
int maximumConnections = 1)
{
// Arguments validation omitted
if (!Directory.Exists(targetDirectory))
throw new DirectoryNotFoundException(targetDirectory);
var fsLocker = new object();
var ftpClientPool = new BufferBlock<FtpClient>();
async Task<TResult> UsingFtpAsync<TResult>(Func<FtpClient, Task<TResult>> action)
{
var client = await ftpClientPool.ReceiveAsync();
try { return await action(client); }
finally { ftpClientPool.Post(client); } // Return to the pool
}
var downloader = new ActionBlock<IEnumerable<string>>(async path =>
{
var remotePath = String.Join("/", path);
var localPath = Path.Combine(path.Prepend(targetDirectory).ToArray());
var localDir = Path.GetDirectoryName(localPath);
lock (fsLocker) Directory.CreateDirectory(localDir);
var status = await UsingFtpAsync(client =>
client.DownloadFileAsync(localPath, remotePath));
if (status == FtpStatus.Failed) throw new InvalidOperationException(
$"Download of '{remotePath}' failed.");
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = maximumConnections,
BoundedCapacity = maximumConnections,
});
async Task Recurse(IEnumerable<string> path)
{
if (downloader.Completion.IsCompleted) return; // The downloader has failed
var listing = await UsingFtpAsync(client =>
client.GetListingAsync(String.Join("/", path)));
foreach (var item in listing)
{
if (item.Type == FtpFileSystemObjectType.Directory)
{
if (item.Size != 0) await Recurse(path.Append(item.Name));
}
else if (item.Type == FtpFileSystemObjectType.File)
{
var accepted = await downloader.SendAsync(path.Append(item.Name));
if (!accepted) break; // The downloader has failed
}
}
}
// Move on to the thread pool, to avoid ConfigureAwait(false) everywhere
return Task.Run(async () =>
{
// Fill the FtpClient pool
for (int i = 0; i < maximumConnections; i++)
{
var client = new FtpClient(ftpHost);
if (username != null && password != null)
client.Credentials = new NetworkCredential(username, password);
ftpClientPool.Post(client);
}
try
{
// Enumerate the files to download
await Recurse(new[] { ftpRoot });
downloader.Complete();
}
catch (Exception ex) { ((IDataflowBlock)downloader).Fault(ex); }
try
{
// Await the downloader to complete
await downloader.Completion;
}
catch (OperationCanceledException)
when (downloader.Completion.IsCanceled) { throw; }
catch { downloader.Completion.Wait(); } // Propagate AggregateException
finally
{
// Clean up
if (ftpClientPool.TryReceiveAll(out var clients))
foreach (var client in clients) client.Dispose();
}
});
}
使用示例:
await FtpDownloadDeep("ftp://ftp.test.com", "", @"C:\FtpTest",
"username", "password", maximumConnections: 10);
注意:上面的实现是按照下载过程的节奏懒惰地枚举远程目录。如果您喜欢急切地枚举它,尽快收集有关远程列表的所有可用信息,只需从下载文件中删除BoundedCapacity = maximumConnections
配置。ActionBlock
请注意,这样做可能会导致高内存消耗,以防远程目录具有较深的子文件夹层次结构,累积包含大量小文件。