1

我希望有人可以通过 TPL 和 SQL 连接确认这里实际发生了什么。

基本上,我有一个大型应用程序,它本质上从 SQL Server 读取一个表,然后依次处理每一行。每一行的处理可能需要相当长的时间。所以,我想改变它以使用任务并行库,在数据表的行中使用“Parallel.ForEach”。这似乎工作了一段时间(几分钟),然后一切都变成了梨形......

“在从池中获取连接之前已经过了超时时间。这可能是因为所有池连接都在使用中并且达到了最大池大小。”

现在,我推测了以下内容(这当然可能完全错误)

“ForEach”为每一行创建任务,根据核心(或其他)的数量达到一定的限制。让我们说 4 想要一个更好的主意。这四个任务中的每一个都有一行,然后开始处理它。TPL 等到机器不太忙,然后再启动一些。我预计最多四个。

但这不是我观察到的——也不是我认为正在发生的事情。

所以......我写了一个快速测试(见下文)

Sub Main()
    Dim tbl As New DataTable()

    FillTable(tbl)

    Parallel.ForEach(tbl.AsEnumerable(), AddressOf ProcessRow)

End Sub

Private n As Integer = 0

Sub ProcessRow(row As DataRow, state As ParallelLoopState)
    n += 1 ' I know... not thread safe
    Console.WriteLine("Starting thread {0}({1})", n, Thread.CurrentThread.ManagedThreadId)
    Using cnx As SqlConnection = New SqlConnection(My.Settings.ConnectionString)
        cnx.Open()
        Thread.Sleep(TimeSpan.FromMinutes(5))
        cnx.Close()
    End Using
    Console.WriteLine("Closing thread {0}({1})", n, Thread.CurrentThread.ManagedThreadId)
    n -= 1
End Sub

这比我对任务数量的猜测要多得多。所以,我推测 TPL 将任务启动到它认为会让我的机器忙的极限,但是嘿,这是什么,我们这里不是很忙,所以让我们开始更多。仍然不是很忙,所以......等等(似乎每秒钟都有一个新任务 - 大致)

这是合理的,但我希望它在何时以及如果它获得 100 个打开的 SQL 连接(默认连接池大小)之后会弹出 30 秒(SQL 连接超时),但它没有。

因此,为了稍微缩小一点,我更改了连接字符串以限制最大池大小。

Sub Main()
    Dim tbl As New DataTable()

    Dim csb As New SqlConnectionStringBuilder(My.Settings.ConnectionString)
    csb.MaxPoolSize = 10
    csb.ApplicationName = "Test 1"
    My.Settings("ConnectionString") = csb.ToString()

    FillTable(tbl)

    Parallel.ForEach(tbl.AsEnumerable(), AddressOf ProcessRow)

End Sub

我计算了与 SQL 服务器的实际连接数,正如预期的那样,它是 10。但是我的应用程序已经启动了 26 个任务 - 然后挂起。因此,为 SQL 设置最大池大小以某种方式将任务数限制为 26,但为什么没有 27,特别是为什么它不会因为池已满而在 11 时倒下?

显然,在某个地方,我要求的工作比我的机器所能做的更多,我可以将“MaxDegreesOfParallelism”添加到 ForEach,但我对这里实际发生的事情感兴趣。

PS。

实际上,在处理 26 个任务(我猜)5 分钟后,它确实会因原始(达到最大池大小)错误而崩溃。嗯?

谢谢。

编辑1:

实际上,我现在认为在任务(我的“ProcessRow”方法)中发生的事情是,在 10 个成功的连接/任务之后,第 11 个确实阻塞了连接超时,然后确实得到了原始异常 - 就像任何后续任务一样。

所以......我得出结论,TPL 正在以大约每秒 1 的速度创建任务,并且它有足够的时间在任务 11 引发异常之前创建大约 26/27。然后所有后续任务也会抛出异常(大约相隔一秒)并且 TPL 停止创建新任务(因为它在一个或多个任务中获得未处理的异常?)

由于某种原因(尚未确定),ForEach 会挂起一段时间。如果我修改我的 ProcessRow 方法以使用状态说“停止”,它似乎没有效果。

Sub ProcessRow(row As DataRow, state As ParallelLoopState)
    n += 1
    Console.WriteLine("Starting thread {0}({1})", n, Thread.CurrentThread.ManagedThreadId)
    Try
        Using cnx As SqlConnection = fnNewConnection()
            Thread.Sleep(TimeSpan.FromMinutes(5))
        End Using
    Catch ex As Exception
        Console.WriteLine("Exception on thread {0}", Thread.CurrentThread.ManagedThreadId)
        state.Stop()
        Throw
    End Try
    Console.WriteLine("Closing thread {0}({1})", n, Thread.CurrentThread.ManagedThreadId)
    n -= 1
End Sub

编辑2:

Dur... 长时间延迟的原因是,虽然任务 11 之后的任务都崩溃和烧毁,但任务 1 到 10 不会,并且都坐在那里睡了 5 分钟。TPL 已停止创建新任务(因为它创建的一个或多个任务中出现未处理的异常然后等待未崩溃的任务完成。

4

2 回答 2

1

对原始问题的编辑增加了更多细节,最终答案变得显而易见。

TPL 重复创建任务,因为它创建的任务基本上是空闲的。这很好,直到连接池耗尽,此时需要新连接的任务等待一个可用,然后超时。与此同时,TPL 仍在创造更多的任务,所有这些都注定要失败。连接超时后,任务开始失败,随后的异常导致 TPL 停止创建新任务。然后,在引发 AggregateException 之前,TPL 会等待确实获得连接的任务完成。

于 2013-09-20T11:26:38.127 回答
0

TPL 不适用于受 IO 限制的工作。它具有启发式方法,用于控制处于活动状态的线程数。这些启发式方法对于长时间运行和/或受 IO 限制的任务会失败,导致它在没有实际限制的情况下注入越来越多的线程。

使用 PLINQ 设置固定数量的线程,使用WithDegreeOfParallelism. 您可能应该测试不同的数量。它可能看起来像这样。我已经在 SO 上写了更多关于这个主题的文章,但目前我找不到。

我不知道为什么您在示例中看到正好26 个线程。请注意,当池耗尽时,建立连接的请求只会在超时后失败。整个系统是非常不确定的,我认为任何数量的线程都是合理的。

于 2013-09-19T17:30:07.447 回答