嵌套任务还是 ContinueWith()?在哪里放置WhenAll()
我是 TPL 的新手,我想要一些有关如何以正确方式完成此任务的指导。我已经用这些资源阅读了 TPL 库,但我仍然不清楚最好的方法:
- 使用 MS .NET 进行并行编程(在线阅读):https ://msdn.microsoft.com/en-us/library/ff963553.aspx
- MS 并行编程论坛:https ://social.msdn.microsoft.com/Forums/en-US/home?forum=parallelextensions
- 使用 .NET Framework 进行并行编程的示例
- 基于任务的异步编程
- 和别的
最终结果是向第三方 API 提交超过 50 万条记录。尝试一步完成会导致超时和其他随机错误。
这是当前的逻辑,它只是部分成功。
AssignGroupNumbers
为表中的每条记录分配一个组号。这导致每条记录被分配到 n 个组中的一个,最多 50 个左右。最初的想法是在一个新线程上处理这些组中的每一个,因此 GroupCount 是用户定义的,允许我们尝试不同数量的线程。对于 TPL,这可能不是必需的。ProcessSourceData
读取特定组的记录并调用ProcessOneGroup
ProcessOneGroup
创建每个组中记录的小型 XML 文档(大约 100 条左右)。这也是一个用户定义的数字以允许进行实验。ProcessOneGroup
然后调用SubmitToNLS
传入 XML 字符串。
SubmitToNLS
是返回成功或失败状态的第三方 API。
该图显示了我如何将其分解为任务,所以我的问题是:
如何在不阻塞 UI 的情况下完成此操作。我已经尝试过 [Task.Run( --> Parallel.ForEach] ,但我如何跟进后续任务?
在这种情况下,提供进度条、错误处理和用户取消的最佳方式是什么?
我希望图表和叙述能提供足够的信息。我已经尝试了几种方法,但目前没有一个像样的代码示例。
更新:经过大量阅读和反复试验,我编写了这段代码,似乎可以完成这项工作。它不是 DataFlow,但我打算在我的下一个项目中使用它。你看到这段代码有什么明显的问题吗?特别是关于异常处理和更新 UI?
using ....
namespace actX
{
public partial class ImportactX_Parallel : Form
{
public ImportactX_Parallel()
{
InitializeComponent();
toolStripStatusLabel1.Text = string.Empty;
SetButtons("New");
}
Action _cancelWork;
/// <summary>
/// Assign Thread Numbers to each record in the selected dataset. Each group
/// of records will be extracted individually and then broken up into
/// batches of the size entered on this form.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void btnInitialize_Click(object sender, EventArgs e)
{
.
.
.
// ======================================================================== //
// Offload the Thread Number assignment so the UI thread remains responsive //
// ======================================================================== //
await Task.Run(() =>
{
try
{
using (var conn = new OracleConnection(oradb))
using (var cmd = new OracleCommand())
{
.
.
.
cmd.ExecuteNonQuery();
}
if (status.Contains("ERROR:"))
LogError("", process.Substring(0, 1), status);
}
catch (Exception ex)
{
status = "INITIALIZE ERROR: " + ex.Message;
}
});
// ================================================================= //
if (status.Contains("ERROR:"))
LogError("", process.Substring(0, 1), status);
.
.
.
MessageBox.Show("Initialization is complete", "Complete");
}
}
/// <summary>
/// Extract batches and add to a list with Parallel.For()
/// Submit each batch in the list with Parallel.ForEach()
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void btnSubmit_Click(object sender, EventArgs e)
{
.
.
.
try
{
// prepare to handle cancellation
var cts = new CancellationTokenSource();
var token = cts.Token;
this._cancelWork = () =>
{
this.btnCancel.Enabled = false;
cts.Cancel();
};
.
.
.
// ==========================
// Create the list of batches
// ==========================
await Task.Run(() => Parallel.For(1, threadCount + 1, options, (g, loopState) =>
{
var result = "Success";
try
{
using (var conn = new OracleConnection(oradb))
using (var cmdProcess = new OracleCommand())
{
.
.
.
var rdr = cmdProcess.ExecuteReader();
while (moreRecordsExist)
{
// check for cancellation
if (token.IsCancellationRequested)
{
loopState.Break();
return;
}
if (totalRecordsInThisBatch == 0)
{
// Start a new batch
}
// Add the current record to the batch
xmlBatch.Append(rdr["***"] + Environment.NewLine);
// Read the next XML record
moreRecordsExist = totalRecordsInThisBatch < 5 && rdr.Read(); // TEST: limit the record count for testing
if (totalRecordsInThisBatch >= MaxRecordsInOneBatch || moreRecordsExist == false)
{
// End the current batch
xmlBatch.Append("</actX>" + Environment.NewLine);
// Add the batch to the list
lock (lockList)
{
batchList.Add(xmlBatch.ToString());
}
// reset record count to trigger a new batch
totalRecordsInThisBatch = 0;
}
}
}
// Update progress indicators
lock (lockProgress)
{
progressBar1.BeginInvoke((MethodInvoker)delegate
{
progressBar1.Value++;
});
lstStatus.BeginInvoke((MethodInvoker)delegate
{
lstStatus.Items.Add(String.Format("{0}: Building batch list: Thread {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), g));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
});
}
}
catch (Exception ex)
{
result = String.Format("ERROR: {0}", ex.InnerException.Message);
}
if (result != "Success")
LogError("", process, result);
}));
lstStatus.Items.Add(String.Format("{0}: Building batch list: END", DateTime.Now.ToString("yyyyMMddHHmmss")));
lstStatus.Items.Add("=============================================");
// ====================================================
// Submit all the batches in batchList to the processor
// ====================================================
var submitResult = await Task.Run(() => Parallel.ForEach(batchList, (batch, loopState) =>
{
var result = "Success";
// check for cancellation
if (token.IsCancellationRequested)
{
toolStripStatusLabel1.Text = "Cancellation requested, please wait for running threads to complete...";
lstStatus.BeginInvoke((MethodInvoker)delegate
{
lstStatus.Items.Add(String.Format("{0}: Cancellation requested...", DateTime.Now.ToString("yyyyMMddHHmmss")));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
});
loopState.Break();
}
// Initialize the ActiveX control for the current thread
Type actXType = Type.GetTypeFromProgID("activex control");
dynamic actX = Activator.CreateInstance(actXType);
if (actX != null)
{
// Verify the DB connection to actX
actX.ConnectionName = ConfigurationManager.AppSettings["actXConnectionName"];
if (actX.InitializedConnection())
{
actX.ImportString = batch;
if (actX.ImportXML == true)
{
.
.
.
}
else
{
result = "ERROR: " + actX.ErrorMessage;
}
}
actX = null;
}
else
{
result = "ERROR: Unable to create API object.";
}
if (result.Contains("ERROR:"))
LogError("", process, result);
// Update progress indicators
lock (lockProgress)
{
lstStatus.BeginInvoke((MethodInvoker)delegate
{
lstStatus.Items.Add(String.Format("{0}: Submission result: {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), result));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
});
progressBar1.BeginInvoke((MethodInvoker)delegate
{
progressBar1.Value++;
});
}
}));
var cancelledByUser = submitResult.IsCompleted == false && submitResult.LowestBreakIteration.HasValue == true;
if (cancelledByUser)
{
toolStripStatusLabel1.Text = "Cancelled by user";
lstStatus.Items.Add(String.Format("{0}: Cancelled by user", DateTime.Now.ToString("yyyyMMddHHmmss")));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
}
else
{
toolStripStatusLabel1.Text = "Complete";
lstStatus.Items.Add(String.Format("{0}: Submitting batches: END", DateTime.Now.ToString("yyyyMMddHHmmss")));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
}
}
catch (AggregateException ae)
{
// Handle exceptions
if (ae.InnerExceptions.Count > 0)
{
toolStripStatusLabel1.Text = "Completed: with errors.";
foreach (var ex in ae.InnerExceptions)
{
lstStatus.Items.Add(String.Format("{0}: AGGREGATED ERRORS: {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), ex.InnerException.Message));
// Log the error
}
lstStatus.TopIndex = lstStatus.Items.Count - 1;
}
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
finally
{
// Save status to text file so we can verify that everything ran correctly
var logFile = "actXImport_" + DateTime.Now.ToString("yyyyMMddHHmmss") + ".log";
using (TextWriter tw = new StreamWriter(logFile, false))
{
for (int i = 0; i <= lstStatus.Items.Count - 1; i++)
tw.WriteLine(lstStatus.Items[i]);
}
MessageBox.Show("Import is complete." + Environment.NewLine + Environment.NewLine + "See the log (" + logFile + ") for details.", "Complete");
Application.Exit();
}
// set the buttons
SetButtons("Processed");
this._cancelWork = null;
}
}
private void btnCancel_Click(object sender, EventArgs e)
{
if (this._cancelWork != null)
this._cancelWork();
}
public void LogError(string actX_id, string process, string errMessage)
{
using (var conn = new OracleConnection(oradb))
using (var cmd = new OracleCommand())
{
try
{
if (errMessage.Length > 4000)
errMessage = errMessage.Substring(0, 3000);
.
.
.
cmd.ExecuteNonQuery();
}
catch (Exception ex)
{
throw new Exception(ex.Message);
}
}
}
public void SelectionsChanged(object sender, EventArgs e)
{
SetButtons("New");
}
private void SetButtons(string mode)
{
this.btnInitialize.Enabled = mode == "New" || mode == "Processed" || mode == "Initialized";
this.btnSubmit.Enabled = mode == "Initialized";
this.btnCancel.Enabled = mode == "Initializing" || mode == "Processing";
this.grpDataSet.Enabled = !btnCancel.Enabled;
this.grpProcess.Enabled = !btnCancel.Enabled;
}
}
}