20

我喜欢 TPL 中 Parallel.For 和 Parallel.ForEach 扩展方法的简单性。我想知道是否有办法利用类似的东西,甚至是稍微高级一点的任务。

下面是 SqlDataReader 的典型用法,我想知道它是否可行,如果可行,如何用 TPL 中的内容替换下面的 while 循环。因为读者不能提供固定数量的迭代,所以 For 扩展方法是不可能的,这让我不得不处理我收集的任务。我希望有人可能已经解决了这个问题,并通过 ADO.net 制定了一些注意事项。

using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
    conn.Open();

    SqlDataReader reader = comm.ExecuteReader();

    if (reader.HasRows)
    {
        while (reader.Read())
        {
            // Do something with Reader
        }
    }
}
4

2 回答 2

26

您将很难直接替换该 while 循环。 SqlDataReader不是线程安全类,因此不能直接从多个线程中使用它。

话虽如此,您可能会使用 TPL处理您读取的数据。这里有几个选项。最简单的方法可能是让您自己的IEnumerable<T>实现适用于阅读器,并返回包含您的数据的类或结构。然后,您可以使用 PLINQ 或Parallel.ForEach语句并行处理您的数据:

public IEnumerable<MyDataClass> ReadData()
{
    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    {
        conn.Open();

        SqlDataReader reader = comm.ExecuteReader();

        if (reader.HasRows)
        {
            while (reader.Read())
            {
                yield return new MyDataClass(... data from reader ...);
            }
        }
    }
}

一旦你有了这个方法,你就可以通过 PLINQ 或 TPL 直接处理它:

Parallel.ForEach(this.ReadData(), data =>
{
    // Use the data here...
});

或者:

this.ReadData().AsParallel().ForAll(data => 
{
    // Use the data here...
});
于 2010-06-22T20:10:52.650 回答
20

您快到了。使用此签名将您发布在函数中的代码包装起来:

IEnumerable<IDataRecord> MyQuery()

然后// Do something with Reader用这个替换你的代码:

yield return reader;

现在你有了在一个线程中工作的东西。不幸的是,当您阅读查询结果时,它每次都会返回对同一个对象的引用,并且该对象只会在每次迭代时自行改变。这意味着如果你尝试并行运行它,你会得到一些非常奇怪的结果,因为并行读取会改变不同线程中使用的对象。您需要代码来获取记录的副本以发送到您的并行循环。

不过,在这一点上,我喜欢做的是跳过记录的额外副本,直接进入强类型类。更重要的是,我喜欢使用通用方法来做到这一点:

IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    {
        addParameters(cmd.Parameters);

        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        {
            while (rdr.Read())
            {
                yield return factory(rdr);
            }
        }
    }
}

假设您的工厂方法按预期创建副本,则此代码应该可以安全地用于 Parallel.ForEach 循环。调用该方法看起来像这样(假设一个 Employee 类具有一个名为“Create”的静态工厂方法):

var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => {
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       });
Parallel.ForEach(UnderPaid, e => e.GiveRaise());

重要更新:
我对这段代码没有以前那么自信了。当另一个线程正在复制它时,一个单独的线程仍然可以改变阅读器。我可以锁定它,但我也担心另一个线程可能会在原件本身调用 Read() 之后但在它开始制作副本之前调用更新阅读器。因此,这里的关键部分由整个 while 循环组成……此时,您又回到了单线程。我希望有一种方法可以修改此代码以在多线程场景中按预期工作,但需要更多研究。

于 2010-06-22T20:50:53.737 回答