0

我正在使用Parallel.ForEach将 C# 中的多个文件从谷歌存储桶下载到文件夹位置。我正在使用重试逻辑,因此它可以在下载过程中文件下载失败的情况下重试下载文件。如何为每个文件或Parallel.ForEach循环中的每个线程应用重试逻辑。

Parallel.ForEach(listFiles, objectName =>            
{
    retryCount = 0;                        
    countOfFiles++;
    downloadSuccess = false;
    bucketFileName = Path.GetFileName(objectName.Name);
    guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

    while (retryCount < retryCountInput && downloadSuccess == false)
    {
        try
        {
            FileStream fs = new FileStream(guidFolderPath, FileMode.Create, FileAccess.Write, FileShare.Write);
            using (fs)
            {                                               
                storage.DownloadObjectAsync(bucketName, objectName.Name, fs, option, cancellationToken, progress).Wait();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception occured while downloading file: " + ex.ToString());                   
            Thread.Sleep(RetryInterval(retryCount, minBackoffTimeSpan, maxBackoffTimeSpan, deltaBackoffTimeSpan));
            retryCount++;

        }
    }
}
4

2 回答 2

1

我会将其更改为任务并使用异步。这样你的 Thread.Sleep 就不会阻塞线程池线程。Parallel.ForEach 用于 CPU 密集型工作。

类似的东西:( 如果没有您的其余代码,我将无法编译/测试它)

int retryCountInput = 5;
var tasks = new List<Task>();

foreach (var file in listFiles)
{
    var task = Task.Run(async () =>
    {
        // make it local
        int retryCount = 0;
        string bucketFileName = Path.GetFileName(objectName.Name);
        string guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

        while (retryCount < retryCountInput)
        {
            try
            {
                using (var fs = new FileStream(guidFolderPath, FileMode.Create, FileAccess.Write, FileShare.Write))
                    // Use await here, instead of `Wait()` so this threadpool thread
                    // can be used for other tasks.
                    await storage.DownloadObjectAsync(bucketName, objectName.Name, fs, option, cancellationToken, progress);

                break;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception occured while downloading file: " + ex.ToString());

                // Use Task.Delay here, so this thread is 'released'
                await Task.Delay(RetryInterval(retryCount, minBackoffTimeSpan, maxBackoffTimeSpan, deltaBackoffTimeSpan));
                retryCount++;
            }
        }
    });
    tasks.Add(task);
}
await Task.WhenAll(tasks);
于 2020-04-06T08:35:53.427 回答
0

我已经修改了我的代码并删除了,Parallel.ForEach而是使用foreach循环来遍历文件。但是现在,我无法在下载路径中找到所有文件,尽管日志显示所有文件都已下载。下载路径中下载文件的数量发生了变化,这种行为似乎是随机的。我可以Task.Run用于 I/O 操作吗?

var tasks = new List<Task>();
foreach (var objectName in listFiles)
{
    var task = Task.Run(() =>
    {
        downloadSuccess = false;
        bucketFileName = Path.GetFileName(objectName.Name);
        guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

        var maxRetryAttempts = 3;
        var pauseBetweenFailures = TimeSpan.FromSeconds(2);
        RetryHelper.RetryOnException(maxRetryAttempts, pauseBetweenFailures, async () =>
        {
            FileStream fs = new FileStream(guidFolderPath, FileMode.Create,
                FileAccess.Write, FileShare.Write);
            using (fs)
            {
                var progress = new Progress<IDownloadProgress>(
                    p =>
                    {
                        DownloadProgress(p, retryCount, objectName.Name);
                    });

                await client.DownloadObjectAsync(bucketName, objectName.Name,
                    fs, option, cancellationToken.Token, progress);
            }
        });
    });
    tasks.Add(task);
}
await Task.WhenAll(tasks);
于 2020-04-23T06:10:36.207 回答