我希望有人可以通过 TPL 和 SQL 连接确认这里实际发生了什么。
基本上,我有一个大型应用程序,它本质上从 SQL Server 读取一个表,然后依次处理每一行。每一行的处理可能需要相当长的时间。所以,我想改变它以使用任务并行库,在数据表的行中使用“Parallel.ForEach”。这似乎工作了一段时间(几分钟),然后一切都变成了梨形......
“在从池中获取连接之前已经过了超时时间。这可能是因为所有池连接都在使用中并且达到了最大池大小。”
现在,我推测了以下内容(这当然可能完全错误)。
“ForEach”为每一行创建任务,根据核心(或其他)的数量达到一定的限制。让我们说 4 想要一个更好的主意。这四个任务中的每一个都有一行,然后开始处理它。TPL 等到机器不太忙,然后再启动一些。我预计最多四个。
但这不是我观察到的——也不是我认为正在发生的事情。
所以......我写了一个快速测试(见下文):
Sub Main()
Dim tbl As New DataTable()
FillTable(tbl)
Parallel.ForEach(tbl.AsEnumerable(), AddressOf ProcessRow)
End Sub
Private n As Integer = 0
Sub ProcessRow(row As DataRow, state As ParallelLoopState)
n += 1 ' I know... not thread safe
Console.WriteLine("Starting thread {0}({1})", n, Thread.CurrentThread.ManagedThreadId)
Using cnx As SqlConnection = New SqlConnection(My.Settings.ConnectionString)
cnx.Open()
Thread.Sleep(TimeSpan.FromMinutes(5))
cnx.Close()
End Using
Console.WriteLine("Closing thread {0}({1})", n, Thread.CurrentThread.ManagedThreadId)
n -= 1
End Sub
这比我对任务数量的猜测要多得多。所以,我推测 TPL 将任务启动到它认为会让我的机器忙的极限,但是嘿,这是什么,我们这里不是很忙,所以让我们开始更多。仍然不是很忙,所以......等等(似乎每秒钟都有一个新任务 - 大致)。
这是合理的,但我希望它在何时以及如果它获得 100 个打开的 SQL 连接(默认连接池大小)之后会弹出 30 秒(SQL 连接超时),但它没有。
因此,为了稍微缩小一点,我更改了连接字符串以限制最大池大小。
Sub Main()
Dim tbl As New DataTable()
Dim csb As New SqlConnectionStringBuilder(My.Settings.ConnectionString)
csb.MaxPoolSize = 10
csb.ApplicationName = "Test 1"
My.Settings("ConnectionString") = csb.ToString()
FillTable(tbl)
Parallel.ForEach(tbl.AsEnumerable(), AddressOf ProcessRow)
End Sub
我计算了与 SQL 服务器的实际连接数,正如预期的那样,它是 10。但是我的应用程序已经启动了 26 个任务 - 然后挂起。因此,为 SQL 设置最大池大小以某种方式将任务数限制为 26,但为什么没有 27,特别是为什么它不会因为池已满而在 11 时倒下?
显然,在某个地方,我要求的工作比我的机器所能做的更多,我可以将“MaxDegreesOfParallelism”添加到 ForEach,但我对这里实际发生的事情感兴趣。
PS。
实际上,在处理 26 个任务(我猜)5 分钟后,它确实会因原始(达到最大池大小)错误而崩溃。嗯?
谢谢。
编辑1:
实际上,我现在认为在任务(我的“ProcessRow”方法)中发生的事情是,在 10 个成功的连接/任务之后,第 11 个确实阻塞了连接超时,然后确实得到了原始异常 - 就像任何后续任务一样。
所以......我得出结论,TPL 正在以大约每秒 1 的速度创建任务,并且它有足够的时间在任务 11 引发异常之前创建大约 26/27。然后所有后续任务也会抛出异常(大约相隔一秒)并且 TPL 停止创建新任务(因为它在一个或多个任务中获得未处理的异常?)
由于某种原因(尚未确定),ForEach 会挂起一段时间。如果我修改我的 ProcessRow 方法以使用状态说“停止”,它似乎没有效果。
Sub ProcessRow(row As DataRow, state As ParallelLoopState)
n += 1
Console.WriteLine("Starting thread {0}({1})", n, Thread.CurrentThread.ManagedThreadId)
Try
Using cnx As SqlConnection = fnNewConnection()
Thread.Sleep(TimeSpan.FromMinutes(5))
End Using
Catch ex As Exception
Console.WriteLine("Exception on thread {0}", Thread.CurrentThread.ManagedThreadId)
state.Stop()
Throw
End Try
Console.WriteLine("Closing thread {0}({1})", n, Thread.CurrentThread.ManagedThreadId)
n -= 1
End Sub
编辑2:
Dur... 长时间延迟的原因是,虽然任务 11 之后的任务都崩溃和烧毁,但任务 1 到 10 不会,并且都坐在那里睡了 5 分钟。TPL 已停止创建新任务(因为它创建的一个或多个任务中出现未处理的异常),然后等待未崩溃的任务完成。