我浏览了数十篇文章,但找不到解决方案。
这是逻辑:
我有一个 Winform (VS2010) 应用程序,它需要从 SQL Server 2008 R2 Express 表中读取数据A
,处理一些计算并存储在不同的表中B
。
我想使用并行ForEach
来缩短执行时间(否则计算+SQL过程需要几天......)
我必须从 SQL 读取,因为数据库有超过 500 万行,每次读取返回几百行。
列表定义为:
BindingList<ItemsClass> etqM = new BindingList<ItemsClass>();
BindingList<ItemsClass> etqC = new BindingList<ItemsClass>();
并行执行:
Parallel.ForEach(etqC, cv => {
readData(ref etqM, "tableA", "WHERE ID LIKE '" + cv.Name + "%'");
IList<ItemsClass> eResults = etqM.OrderBy(f => f.ID).ToList();
foreach (ItemsClass R in eResults)
{
//calculations comes here
etqM[rID] = R;
}
Parallel.ForEach(etqM, r => {
// part 2 of calculations comes here
}
});
exportList(etqM, "tableB", true);
});
SQL读取函数:该函数获取一个List、表名+SQL从SQL读取记录的条件,并将它们转换为List格式。
public void readData<T>(ref BindingList<T> etqList, string tableName, string conditions)
{
SqlConnection myConnection = new SqlConnection();
SqlCommand myCommand = new SqlCommand();
myCommand.CommandTimeout = 0;
myCommand.Connection = myConnection;
etqList.Clear();
openConn(myConnection);
SqlDataReader myReader = null;
try
{
int totalResults;
myCommand.CommandText = "SELECT COUNT (*) FROM " + tableName + " " + conditions;
totalResults = (int)myCommand.ExecuteScalar();
if (totalResults > 0)
{
myCommand.CommandText = "SELECT * FROM " + tableName + " " + conditions;
myReader = myCommand.ExecuteReader();
etqList = ConvertTo<T>(convertReaderToDT(myReader));
}
}
catch (Exception ex) { }
finally
{
try { myReader.Close(); }
catch { }
}
closeConn(myConnection);
}
SQL 导出函数:此函数将给定列表导出到表名。
private void exportListToSql<T>(IEnumerable<T> etqList, string tableName)
{
SqlConnection myConnection = new SqlConnection();
SqlCommand myCommand = new SqlCommand();
myCommand.CommandTimeout = 0;
myCommand.Connection = myConnection;
openConn(myConnection);
try
{
actionTotalCount++;
DataTable dt = new DataTable(tableName);
dt = ToDataTable(etqList);//List Name
var bulkCopy = new SqlBulkCopy(myConnection,
SqlBulkCopyOptions.TableLock |
SqlBulkCopyOptions.FireTriggers |
SqlBulkCopyOptions.UseInternalTransaction,
null
);
bulkCopy.DestinationTableName = tableName;
bulkCopy.BatchSize = BATCH_SIZE;
bulkCopy.WriteToServer(dt);
}
catch (Exception ex) { }
finally { closeConn(myConnection); }
}
SQLopenConn
和closeConn
:
void openConn(SqlConnection myConnection)
{
if (myConnection.State == ConnectionState.Open) return;
myConnection.ConnectionString = "Data Source=" + DB_NAME + ";Initial Catalog=APPDB;Integrated Security=True;Connect Timeout=120;Asynchronous Processing=true;";
try { myConnection.Open(); actionTotalCount++; }
catch (System.Exception ex) { MessageBox.Show(ex.Message); }
}
void closeConn(SqlConnection myConnection)
{
if (myConnection.State == ConnectionState.Fetching || myConnection.State == ConnectionState.Executing || myConnection.State == ConnectionState.Connecting) return;
try { myConnection.Dispose(); }
catch (System.Exception ex) { MessageBox.Show(ex.Message); }
}
问题是:一旦我执行,我就会收到这条消息:
ExecuteScalar 需要一个开放且可用的连接。连接的当前状态为关闭。
此消息到达所有线程,第一个线程除外。
有任何想法吗 ?