在开始其他任务之前,我需要进行一系列异步调用,其中一些依赖于其他任务的完成。我想为这些调用重试逻辑,因为它们都是对外部资源的调用。这是我们的原型代码,没有任何异常处理或重试逻辑:(这可以完成工作,但没有任何弹性或错误处理)。经过相当多的研究,我需要一些帮助以一种有弹性的方式将它们组合在一起。
Task uploadHdr = null;
Task uploadElig = null;
Task ImportHdrCPA = null;
Task ImportHdrCCRS = null;
Task ImportEligCPA = null;
Task ImportEligCCRS = null;
Task ProcessCPA = null;
Task ProcessCCRS = null;
//Connect to blob storage
CloudStorageAccount storageacc = CloudStorageAccount.Parse("connection string to blob storage");
CloudBlobClient blobClient = storageacc.CreateCloudBlobClient();
CloudBlobContainer container = blobClient.GetContainerReference("ourcontainer");
//Import Header to Choice PA
using (SqlConnection conCPA = new SqlConnection("connection string for CPA"))
using (SqlConnection conCCRS = new SqlConnection("connection string for CCRS"))
{
conCPA.Open();
conCCRS.Open();
//Upload HDR file
CloudBlockBlob hdrBlob = container.GetBlockBlobReference("Header.txt");
FileStream fsHdr = System.IO.File.OpenRead(@"C:\Development\Header.txt");
uploadHdr = hdrBlob.UploadFromStreamAsync(fsHdr);
allTasks.Add(uploadHdr);
Console.WriteLine("Started Header Upload " + stopwatch.Elapsed.ToString());
//Upload eligibility segment file
CloudBlockBlob elgBlob = container.GetBlockBlobReference("Detail.txt");
FileStream fsElig = System.IO.File.OpenRead(@"C:\Development\Detail.txt");
uploadElig = elgBlob.UploadFromStreamAsync(fsElig);
allTasks.Add(uploadElig);
Console.WriteLine("Started Detail Upload " + stopwatch.Elapsed.ToString());
while (allTasks.Any())
{
Task.WhenAny(allTasks.ToArray());
if (uploadHdr.IsCompletedSuccessfully && ImportHdrCPA == null)
{
SqlCommand cmdHdrCPA = new SqlCommand("dbo.spImportHeader", conCPA) { CommandType = CommandType.StoredProcedure, CommandTimeout = 0 };
cmdHdrCPA.Parameters.Add("@FileName", SqlDbType.VarChar);
cmdHdrCPA.Parameters["@FileName"].Value = "Header.txt";
ImportHdrCPA = cmdHdrCPA.ExecuteNonQueryAsync();
allTasks.Add(ImportHdrCPA);
Console.WriteLine("Started Header Import 1 " + stopwatch.Elapsed.ToString());
}
if (ImportHdrCPA.IsCompletedSuccessfully && ImportHdrCCRS == null)
{
SqlCommand cmd = new SqlCommand("dbo.spNCEligibilityImportHeader", conCCRS) { CommandType = CommandType.StoredProcedure, CommandTimeout = 0 };
cmd.Parameters.Add("@FileName", SqlDbType.VarChar);
cmd.Parameters["@FileName"].Value = "CSC_NCEligibility_Hdr_i_20191105.txt";
ImportHdrCCRS = cmd.ExecuteNonQueryAsync();
allTasks.Add(ImportHdrCCRS);
Console.WriteLine("Started Header Import 2 " + stopwatch.Elapsed.ToString());
}
if(uploadElig.IsCompletedSuccessfully && ImportEligCPA == null)
{
SqlCommand cmd = new SqlCommand("dbo.spImportDetail", conCPA) { CommandType = CommandType.StoredProcedure, CommandTimeout = 0 };
cmd.Parameters.Add("@FileName", SqlDbType.VarChar);
cmd.Parameters["@FileName"].Value = "Detail.txt";
ImportEligCPA = cmd.ExecuteNonQueryAsync();
allTasks.Add(ImportEligCPA);
Console.WriteLine("Started Detail Import 1 " + stopwatch.Elapsed.ToString());
}
if(ImportEligCPA != null && ImportEligCPA.IsCompletedSuccessfully && ImportEligCCRS == null)
{
SqlCommand cmd = new SqlCommand("dbo.spImportDetail", conCCRS) { CommandType = CommandType.StoredProcedure, CommandTimeout = 0 };
cmd.Parameters.Add("@FileName", SqlDbType.VarChar);
cmd.Parameters["@FileName"].Value = "Detail.txt";
ImportEligCCRS = cmd.ExecuteNonQueryAsync();
allTasks.Add(ImportEligCCRS);
Console.WriteLine("Started Detail Import 2 " + stopwatch.Elapsed.ToString());
}
if (ImportHdrCPA.IsCompletedSuccessfully && ImportEligCPA != null && ImportEligCPA.IsCompletedSuccessfully && ProcessCPA == null)
{
SqlCommand cmd = new SqlCommand("dbo.spProcess", conCPA) { CommandType = CommandType.StoredProcedure, CommandTimeout = 0 };
ProcessCPA = cmd.ExecuteNonQueryAsync();
allTasks.Add(ProcessCPA);
Console.WriteLine("Started Processing 1 " + stopwatch.Elapsed.ToString());
}
if (ImportHdrCCRS != null && ImportHdrCCRS.IsCompletedSuccessfully && ImportEligCCRS != null && ImportEligCCRS.IsCompletedSuccessfully && ProcessCCRS == null)
{
SqlCommand cmd = new SqlCommand("dbo.spProcess", conCCRS) { CommandType = CommandType.StoredProcedure, CommandTimeout = 0 };
ProcessCCRS = cmd.ExecuteNonQueryAsync();
allTasks.Add(ProcessCCRS);
Console.WriteLine("Started Processing 2 " + stopwatch.Elapsed.ToString());
}
if (ProcessCCRS != null && ProcessCPA != null)
{
Task.WaitAll(allTasks.ToArray());
allTasks.Clear();
}
Thread.Sleep(5000);
}