1

我有一个 SSIS 转换任务,我用它作为我的最终目标任务,将数据插入到 SQL Server 表中。我使用转换任务而不是 SQL Server 目标任务的原因是因为我事先不知道我们将插入到的表中的列将是什么。

在每个循环容器中,我正在寻找访问数据库(97 格式)。控制流的其余部分基本上创建了一个新的 SQL 数据库和一个表。访问文件是我们所说的“分钟”数据库,其中包含由另一个进程收集的分钟信息。我需要创建一个以“分钟”数据库命名的新 SQL 数据库和一个名为“分钟”的表,其中的列是根据访问数据库中的某些信息创建的。对于我们的每个客户,根据他们在其站点上的参数数量,确定我需要在 SQL Minute 表中创建的列数。

在数据流中,我有两个关键组件:OLE DB 源组件(Source - Minute Table)和脚本转换任务(Destination - Minute Table)。

“源-分钟表”从访问数据库中获取数据。“目标 - 分钟表”转换数据并将其插入到适当的数据库和表中。

一切正常。我在一个拥有 491,000 多条记录的数据库上对其进行了测试,耗时 1 分钟。但是,我正在与我们的一个较大的客户进行测试,该客户具有 50 多个参数,并且访问数据库包含 2+ 百万条记录。包裹一直飞到我达到大约 477,000 条记录,然后它几乎停止了。我可以等待 10 分钟,甚至更长时间,直到记录计数更新,然后再继续等待。

我做了很多研究,并遵循了我发现的所有建议和指导方针。我的数据源未排序。我在 OLE DB 源中使用 SQL 命令而不是表等。我已经多次更改 DefaultBufferMaxRows 和 DefaultBufferSize 的值并得到相同的结果。

代码:

Public Class ScriptMain
Inherits UserComponent

Private conn As SqlConnection
Private cmd As SqlCommand
Private DBName As SqlParameter
Private columnsForInsert As SqlParameter
Private tableValues As SqlParameter
Private numberOfParams As Integer
Private db As String
Private folderPath As String
Private dbConn As String
Private folder As String
Private columnParamIndex As Integer
Private columnDate As DateTime
Private columnMinValue As Double
Private columnStatus As String
Private columnCnt1 As Int16
Private dateAdded As Boolean = False
Private columnStatusCnt As String
Private columnsConstructed As Boolean = False
Private buildValues As StringBuilder
Private columnValues As StringBuilder
Private i As Integer = 0

'This method is called once, before rows begin to be processed in the data flow.
'
'You can remove this method if you don't need to do anything here.
Public Overrides Sub PreExecute()
    MyBase.PreExecute()

    Try
        'Dim dbConnection As String = "Server=(local)\SQLExpress;Database=DataConversion;User ID=sa;Password=sa123;"
        'conn = New SqlConnection(dbConnection)
        'conn.Open()
        'cmd = New SqlCommand("dbo.InsertValues", conn) With {.CommandType = CommandType.StoredProcedure}

        'columnsForInsert = New SqlParameter("@Columns", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        'cmd.Parameters.Add(columnsForInsert)

        'DBName = New SqlParameter("@DBName", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        'cmd.Parameters.Add(DBName)

        'tableValues = New SqlParameter("@Values", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        'cmd.Parameters.Add(tableValues)

        db = Variables.varMinFileName.ToString
        folder = Variables.varMinFolderName.ToString
        folderPath = folder & "\" & db & ".mdb"
        dbConn = "Provider=Microsoft.Jet.OLEDB.4.0;Data Source=" & folderPath

        Using SourceDataAdapter As OleDbDataAdapter = New OleDbDataAdapter("SELECT DISTINCT PARAM_INDEX FROM [MINUTE];", dbConn)
            Dim SourceDatatable As New DataTable

            SourceDataAdapter.Fill(SourceDatatable)

            numberOfParams = SourceDatatable.Rows.Count
        End Using

        'columnValues.Append("dtmTime, ")
        buildValues = New StringBuilder
        columnValues = New StringBuilder

        columnValues.Append("dtmTime, ")

    Catch ex As Exception
        Dim writer As New StreamWriter("C:\MinuteLog.log", True, System.Text.Encoding.ASCII)

        writer.WriteLine(ex.Message)
        writer.Close()
        writer.Dispose()
    Finally

    End Try
End Sub

' This method is called after all the rows have passed through this component.
'
' You can delete this method if you don't need to do anything here.
Public Overrides Sub PostExecute()
    MyBase.PostExecute()
    '
    ' Add your code here
    '
    buildValues = Nothing
    columnValues = Nothing
End Sub

Public Overrides Sub Input0_ProcessInput(Buffer As Input0Buffer)
    While Buffer.NextRow()
        Input0_ProcessInputRow(Buffer)
    End While
End Sub

'This method is called once for every row that passes through the component from Input0.
Public Overrides Sub Input0_ProcessInputRow(ByVal Row As Input0Buffer)
    Dim column As IDTSInputColumn100
    Dim rowType As Type = Row.GetType()
    Dim columnValue As PropertyInfo
    Dim result As Object
    Dim rtnValue As String = Variables.varMinFileName.Replace("_", "")
    Dim colName As String

    Try
        For Each column In Me.ComponentMetaData.InputCollection(0).InputColumnCollection
            columnValue = rowType.GetProperty(column.Name)

            colName = column.Name.ToString

            If Not colName.Contains("NULL") Then
                'If Not columnValue Is Nothing Then
                Select Case column.Name.ToString
                    Case "PARAM_INDEX"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.PARAMINDEX
                        columnParamIndex = CType(result, Byte)
                        If columnsConstructed = False And i <= numberOfParams - 1 Then
                            columnValues.Append(String.Format("VALUE_{0}, STATUS_{0}, ", columnParamIndex.ToString))
                        End If
                        Exit Select
                    Case "dtmTIME"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.dtmTIME
                        columnDate = CType(result, DateTime)
                        If dateAdded = False Then ' only need to add once since rows are vertical
                            buildValues.Append("'" & columnDate & "', ")
                            dateAdded = True
                        End If
                        Exit Select
                    Case "MIN_VALUE"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.MINVALUE
                        columnMinValue = CType(result, Double)
                        buildValues.Append(columnMinValue & ", ")
                        Exit Select
                    Case "MIN_STATUS"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.MINSTATUS
                        columnStatus = CType(result, String)
                        Exit Select
                    Case "MIN_CNT_1"
                        'result = columnValue.GetValue(Row, Nothing)
                        result = Row.MINCNT1
                        columnCnt1 = CType(result, Byte)
                        columnStatusCnt = columnStatus & "010" & columnCnt1.ToString.PadLeft(5, "0"c) & "-----"
                        buildValues.Append("'" & columnStatusCnt & "', ")
                    Case Else
                        Exit Select
                End Select
                'End If
            End If
        Next

        If i = numberOfParams - 1 Then
            If columnsConstructed = False Then
                columnValues.Remove(columnValues.Length - 2, 1)
            End If

            buildValues.Remove(buildValues.Length - 2, 1)

            Dim valueResult As String = buildValues.ToString()

            SetStoredProc()

            cmd.Parameters("@Columns").Value = columnValues.ToString
            cmd.Parameters("@DBName").Value = "[" & rtnValue & "].[dbo].[MINUTE]"
            cmd.Parameters("@Values").Value = valueResult
            cmd.ExecuteNonQuery()

            buildValues.Clear()

            columnsConstructed = True
            dateAdded = False
            columnParamIndex = 0
            columnMinValue = 0
            columnStatus = String.Empty
            columnCnt1 = 0

            i = 0
            conn.Close()
            conn.Dispose()
        Else
            i += 1
        End If
    Catch ex As Exception
        Dim writer As New StreamWriter("C:\MinuteLog.log", True, System.Text.Encoding.ASCII)

        writer.WriteLine(ex.Message)
        writer.Close()
        writer.Dispose()
    Finally
        'buildValues = Nothing
        'columnValues = Nothing
    End Try
End Sub

Private Sub SetStoredProc()
    Try
        Dim dbConnection As String = "Server=(local)\SQLExpress;Database=DataConversion;User ID=sa;Password=sa123;"
        conn = New SqlConnection(dbConnection)
        conn.Open()
        cmd = New SqlCommand("dbo.InsertValues", conn) With {.CommandType = CommandType.StoredProcedure}

        columnsForInsert = New SqlParameter("@Columns", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        cmd.Parameters.Add(columnsForInsert)

        DBName = New SqlParameter("@DBName", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        cmd.Parameters.Add(DBName)

        tableValues = New SqlParameter("@Values", SqlDbType.VarChar, -1) With {.Direction = ParameterDirection.Input}
        cmd.Parameters.Add(tableValues)
    Catch ex As Exception
        Dim writer As New StreamWriter("C:\MinuteLog.log", True, System.Text.Encoding.ASCII)

        writer.WriteLine(ex.Message)
        writer.Close()
        writer.Dispose()
    End Try
End Sub
End Class

因为我还不能在这里上传图片,所以我已经包含了一个我创建的博客链接,其中包含充足的屏幕截图,以帮助理解这里提到的问题: SSIS 在转换任务期间变慢

任何帮助确定为什么我的包在 40 万条记录后变慢并且没有在合理的时间内处理所有 2+ 百万条记录的任何帮助都非常感谢!

谢谢,吉米

4

2 回答 2

2

这可能不是很有帮助,但我猜你的内存不足。如果 SSIS 必须分页,根据我的经验,你已经有了。

你能以某种方式在几个较小的运行中分批工作吗?

于 2013-08-13T10:57:27.210 回答
1

完整的解决方案可以在我的博客上查看截图 - SSIS 减速已解决

当大量记录被转换并插入到作为我的目的地的 SQL Server 时,为了避免 SSIS 变慢,我重新设计了我的 SSIS 包。我没有在数据转换任务中对通过缓冲区的每条记录进行插入,而是将其消除并使用存储过程进行批量插入。为了实现这一点,我将每个访问数据库中的数据读入我的 SQL Server 实例中名为“MINUTE”的表中。该分钟表与访问数据库具有相同的架构,我让 SSIS 完成将所有数据导入该表的繁重工作。导入数据后,我执行存储过程,该过程转换此分钟表(水平记录)中的数据,并将批量插入到我的新目标 MINUTE SQL 表(一个垂直记录)中。

执行批量插入和转换数据的存储过程如下所示:

PROCEDURE [dbo].[InsertMinuteBulk]
 -- Add the parameters for the stored procedure here
 (@Columns varchar(MAX), @DBName varchar(4000))
 AS
 BEGIN
 DECLARE @SQL varchar(MAX)

SET @SQL =’;WITH Base AS (
 SELECT dtmTime,
 param_index,
 CONVERT(nvarchar(16), MIN_VALUE) AS [VALUE_],
 CONVERT(nvarchar(3), MIN_STATUS) + ”000” + LEFT(replicate(”0”,5) + CONVERT(nvarchar(5), MIN_CNT_1),5) + ”—–” AS [STATUS_]
 FROM [DataConversion].[dbo].[MINUTE]
 )
 ,norm AS (
 SELECT dtmTime, ColName + CONVERT(varchar, param_index) AS ColName, ColValue
 FROM Base
 UNPIVOT (ColValue FOR ColName IN ([VALUE_], [STATUS_])) AS pvt
 )
 INSERT INTO ‘ + @DBName + ‘
SELECT *
 FROM norm
 PIVOT (MIN(ColValue) FOR ColName IN (‘+@Columns+’)) AS pvt’

EXEC (@SQL);

在数据流任务中,“分钟数据源”是一个 ADO.NET 数据源,并将数据馈送到我的 SQL Server 目标——“分钟数据目标”。

在控制流中,“批量插入分钟数据”的最终任务执行批量插入存储过程。

考虑到我正在读取、转换和插入的数据的大小,该程序包现在可以不间断地运行并且速度非常快。

我已经将包作为 SSIS 作业运行,并且需要 38 分钟才能完成转换 7 个月(或 7 分钟访问数据库)的分钟数据,每个访问数据库中有超过 200 万行。

于 2013-08-16T20:12:03.970 回答