我终于自己做了这个,如果其他人需要它,就把它放在这里。
DataTable dtReturn = new DataTable();
var connection = new SqlConnection((_connectionstring.IndexOf("MultipleActiveResultsets") == -1) ? _connectionstring + "MultipleActiveResultsets=True;" : _connectionstring);
try
{
connection.Open();
Task<DataTable>[] tasks = new Task<DataTable>[source.Rows.Count];
for (int i = 0; i < source.Rows.Count; i++)
{
int s = Convert.ToInt32(source.Rows[i][0]);
Task<DataTable> t = Task.Factory.StartNew(() =>
{
var command = connection.CreateCommand();
command.CommandText = commandText;
command.CommandType = CommandType.StoredProcedure;
command.CommandTimeout = timeout;
foreach (SqlParameter p in parms)
{
if (p.ParameterName == "@Sources")
command.Parameters.Add(new SqlParameter() { SqlDbType = System.Data.SqlDbType.Structured, ParameterName = p.ParameterName, Value = s.ToString().ToIDList() });
else
command.Parameters.Add(new SqlParameter() { ParameterName = p.ParameterName, Value = p.Value });
}
var dr = command.ExecuteReader();
command.Parameters.Clear();
DataTable dt = new DataTable();
if (dr.HasRows)
{
dt.Load(dr);
dr.Close();
}
return dt;
});
tasks[i] = t;
}
Task.WaitAll(tasks);
if (tasks.Length > 0)
{
bool isSchemaCloned = false;
foreach (var t in tasks)
{
if (!isSchemaCloned && t.Result.Columns.Count > 0 )
{
dtReturn = t.Result.Clone();
isSchemaCloned = true;
}
foreach (DataRow r in t.Result.Rows)
{
dtReturn.Rows.Add(r.ItemArray);
}
}
}
}
catch (Exception ex)
{
throw ex;
}
finally
{
if (connection.State != ConnectionState.Closed)
connection.Close();
}
return dtReturn;