0

我正在循环中创建异步任务,该任务必须从 sql 数据库中读取数据。随着源数据大小变大,我收到了这个错误:
  • 查询处理器无法启动并行查询执行所需的线程资源。

这是我的代码:


For Each objLocDataTable In pLocationData

    If objLocDataTable IsNot Nothing AndAlso objLocDataTable.ISVALID Then

       listLocationELTTasks.Add(TaskEx.Run(
             Async Function() As Task
                    Dim resTable As System.Data.DataTable = Await mDBUtil.QueryAsync(strQry)

                    ....... 'do calculation works based on the query result
             End Function))
    END if
Next


 Public Async Function QueryAsync(ByVal strSqlQuery As String) As Task(Of DataTable)

  Dim returnDataTable As DataTable = Nothing

  Dim nTriedNumber As Integer = 0, nMaxTryNumber = 10
  Dim bStop As Boolean = False

  While (Not bStop)
   Try
    Using dbConnection As New SqlConnection(mStrConn)
     Using dbComm As New SqlCommand(strSqlQuery, dbConnection)

      Await dbConnection.OpenAsync()
      dbComm.CommandType = CommandType.Text

      Dim readerTask As Task(Of SqlDataReader) = Task.Factory.FromAsync(AddressOf dbComm.BeginExecuteReader, AddressOf dbComm.EndExecuteReader, Nothing)

      returnDataTable = Await readerTask.ContinueWith(
       Function(pervTask)
        Dim resultTable As New DataTable()
        Dim reader As SqlDataReader = Nothing
        Try
         reader = pervTask.Result
         reader.Read()
         resultTable.Load(reader)
         bStop = True
        Catch ex As Exception
         System.Diagnostics.Debug.WriteLine(String.Format("Thread ID [{0}]: nTriedNumber [{1}], QueryAsync error: {2}", Threading.Thread.CurrentThread.ManagedThreadId, nTriedNumber, ex.Message))
        Finally
         If (reader IsNot Nothing) Then
          reader.Close()
         End If
         If (Not dbConnection.State = ConnectionState.Closed) Then
          dbConnection.Close()
         End If
        End Try
        Return resultTable
       End Function)
     End Using
    End Using

   Catch ex As Exception
    System.Diagnostics.Debug.WriteLine(String.Format("Not datareader error, nTriedNumber [{0}], QueryAsync error: {1}", nTriedNumber, ex.Message))
   Finally
    'could be shorten as if( returnDataTable Is Nothing or returnDataTable.Rows.Count() = 5) Then
     System.Diagnostics.Debug.WriteLine(String.Format("Thread ID [{0}]: QueryAsync is sleeping for 1 sec", Threading.Thread.CurrentThread.ManagedThreadId))
     System.Threading.Thread.Sleep(1000)
    End If

    If (nTriedNumber >= nMaxTryNumber) Then
     System.Diagnostics.Debug.WriteLine(String.Format("Thread ID [{0}]: Reach to max try number, failed to retrieve data", Threading.Thread.CurrentThread.ManagedThreadId))
     bStop = True
    End If
   End Try
  End While

  Return returnDataTable
 End Function

就像在函数 QueryAsync 中一样,我尝试过 - 尽快关闭数据库连接。- 甚至睡一会儿他们都不工作

注意: - sql 数据源大约有 5000 万条记录,我无法在 for 循环之前将其立即加载到内存中进行内存中 linq 查询。我试图用 Parrel.foreach 替换任务,结果相同。在任务中从 sql 数据库读取数据的有效方法是什么?

4

1 回答 1

0

看起来您不需要异步 IO。切换到同步数据库调用,你的生活变得非常轻松:

pLocationData
.AsParallel()
.WithDegreeOfParallelism(8) //choose the number by testing
.ForEach(objLocDataTable => Query(objLocDataTable));

(C# 语法)。我重命名为QueryAsyncQuery因为它应该是一个很好的旧同步方法,就像我们一直使用它一样。

这会同时使用 8 个线程执行您的查询。固定数量的线程不会使数据库过载。

于 2013-10-29T13:59:16.423 回答