2

我浏览了数十篇文章,但找不到解决方案。

这是逻辑:

我有一个 Winform (VS2010) 应用程序,它需要从 SQL Server 2008 R2 Express 表中读取数据A,处理一些计算并存储在不同的表中B

我想使用并行ForEach来缩短执行时间(否则计算+SQL过程需要几天......)

我必须从 SQL 读取,因为数据库有超过 500 万行,每次读取返回几百行。

列表定义为:

BindingList<ItemsClass> etqM = new BindingList<ItemsClass>();
BindingList<ItemsClass> etqC = new BindingList<ItemsClass>();

并行执行:

Parallel.ForEach(etqC, cv => {
            readData(ref etqM, "tableA", "WHERE ID LIKE '" + cv.Name + "%'");
            IList<ItemsClass> eResults = etqM.OrderBy(f => f.ID).ToList();

            foreach (ItemsClass R in eResults)
            {
                //calculations comes here

                etqM[rID] = R;
            }

            Parallel.ForEach(etqM, r => {
                // part 2 of calculations comes here
                }
            });
            exportList(etqM, "tableB", true);
        });

SQL读取函数:该函数获取一个List、表名+SQL从SQL读取记录的条件,并将它们转换为List格式。

public void readData<T>(ref BindingList<T> etqList, string tableName, string conditions)
    {
        SqlConnection myConnection = new SqlConnection();
        SqlCommand myCommand = new SqlCommand();
        myCommand.CommandTimeout = 0;
        myCommand.Connection = myConnection;

        etqList.Clear();            
        openConn(myConnection);
        SqlDataReader myReader = null;

        try
        {
            int totalResults;
            myCommand.CommandText = "SELECT COUNT (*) FROM " + tableName + " " + conditions;
            totalResults = (int)myCommand.ExecuteScalar();

            if (totalResults > 0)
            {
                myCommand.CommandText = "SELECT * FROM " + tableName + " " + conditions;
                myReader = myCommand.ExecuteReader();
                etqList = ConvertTo<T>(convertReaderToDT(myReader));
            }
        }
        catch (Exception ex) { }
        finally
        {
            try { myReader.Close(); }
            catch { }
        }
        closeConn(myConnection);
    }

SQL 导出函数:此函数将给定列表导出到表名。

private void exportListToSql<T>(IEnumerable<T> etqList, string tableName)
{
        SqlConnection myConnection = new SqlConnection();
        SqlCommand myCommand = new SqlCommand();
        myCommand.CommandTimeout = 0;
        myCommand.Connection = myConnection;
        openConn(myConnection);

        try
        {
            actionTotalCount++;
            DataTable dt = new DataTable(tableName);
            dt = ToDataTable(etqList);//List Name
            var bulkCopy = new SqlBulkCopy(myConnection,
                SqlBulkCopyOptions.TableLock |
                SqlBulkCopyOptions.FireTriggers |
                SqlBulkCopyOptions.UseInternalTransaction,
                null
                );
            bulkCopy.DestinationTableName = tableName;
            bulkCopy.BatchSize = BATCH_SIZE;
            bulkCopy.WriteToServer(dt);
        }
        catch (Exception ex) {  }
        finally { closeConn(myConnection); }
    }

SQLopenConncloseConn

void openConn(SqlConnection myConnection)
{
        if (myConnection.State == ConnectionState.Open) return;
        myConnection.ConnectionString = "Data Source=" + DB_NAME + ";Initial Catalog=APPDB;Integrated Security=True;Connect Timeout=120;Asynchronous Processing=true;";

        try { myConnection.Open(); actionTotalCount++; }
        catch (System.Exception ex) { MessageBox.Show(ex.Message); }
}

void closeConn(SqlConnection myConnection)
{
        if (myConnection.State == ConnectionState.Fetching || myConnection.State == ConnectionState.Executing || myConnection.State == ConnectionState.Connecting) return;
        try { myConnection.Dispose(); }
        catch (System.Exception ex) { MessageBox.Show(ex.Message); }
}

问题是:一旦我执行,我就会收到这条消息:

ExecuteScalar 需要一个开放且可用的连接。连接的当前状态为关闭。

此消息到达所有线程,第一个线程除外。

有任何想法吗 ?

4

1 回答 1

0

显然问题不在于 SQL,而在于计算。由于 List 是在 'Parallel.Foreach' 之外定义的,因此它被不同的线程同时访问,从而导致崩溃。

一旦我改变了代码如下,一切都很好:

Parallel.ForEach(etqC, cv => {

        BindingList<ItemsClass> etqM = new BindingList<ItemsClass>();

        readData(ref etqM, "tableA", "WHERE ID LIKE '" + cv.Name + "%'");
        IList<ItemsClass> eResults = etqM.OrderBy(f => f.ID).ToList();

        foreach (ItemsClass R in eResults)
        {
            //calculations comes here

            etqM[rID] = R;
        }

        Parallel.ForEach(etqM, r => {
            // part 2 of calculations comes here
            }
        });
        exportList(etqM, "tableB", true);
    });
于 2012-10-25T14:36:01.390 回答