1

所以...我有一个 SSIS 包,它在内存中创建和运行 SSIS 包。“元”包创建发生在设置为目标的 DataFlow 脚本组件内部。基本要点是从一个或多个 Oracle 源中提取数据并将其放入 SQL Server Staging Databased(用于 BI)中。除了尝试使用 DB Link(又名 SCHEMA.TABLE_NAME@OTHER)从 Oracle 源中提取数据时,我在包方面取得了巨大成功。

Oracle Connection Manager 的创建与我没有使用 DB Link 时没有什么不同。我使用的 Select 语句(包含 DB Link 语法)是在我运行 SSIS 包之前创建的。

当我尝试运行包含这些选择语句之一的“元”包时,我收到错误,因为使用 OLE DB SQL Server无法识别的语法。这意味着,它不是使用我的源连接管理器的 Oracle 特性,而是使用我的 SQL Server 目标连接管理器或忘记它是一个 Oracle 连接管理器。

同样,唯一的区别是 SELECT 语句中的 DB Link 语法。这就是字面意思

我尝试重新排列我的代码,以便在需要连接管理器之前创建它们。但这并不能解决问题(只是介绍更多)。我已经尝试使用基础外包装中的连接管理器。但这也有它自己的挑战。我什至试图在需要连接管理器之前“重新初始化”连接管理器,但这似乎没有任何作用。

我正在使用 SSIS 2017。Oracle 数据库的版本各不相同,但都是 11 及以上版本。

有没有人经历过这种情况,如果有,你是如何解决的?

我在下面包含了一部分代码;我不得不修剪一些东西以适应这篇文章。

脚本的主要部分(public override void Input0_ProcessInputRow)分为三个主要部分:一个检查表的大小并确定是否只截断和加载;一个部分(一旦确定不是截断和加载)确定是否需要删除比较;以及确定是否需要处理更新/插入的部分。我几乎完整地包含了截断和加载部分,这几乎贯穿了整个逻辑。

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using Microsoft.SqlServer.Dts.Pipeline;
using System.Data.SqlClient;
#endregion

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    private bool fireAgain = true;
    private bool continueProcessing = true;
    private string processingErrorMessage;
    private string pathName;

    private ConnectionManager srcConMgr;
    private ConnectionManager dstConMgr;

    public override void PreExecute()
    {
        base.PreExecute();
        ComponentMetaData.FireInformation(10, $"Begining Pre Execute of Threaded Path", "Begining Pre Execute of First Threaded Path", "", 0, fireAgain);
    }

    public override void PostExecute()
    {
        base.PostExecute();
        if (continueProcessing)
        {
            ComponentMetaData.FireInformation(10, $"Begining Post Execute of {pathName}", $"Begining Post Execute of {pathName}", "", 0, fireAgain);
        }
        else
        {
            ComponentMetaData.FireInformation(10, $"Begining Post Execute of {pathName}", $"There were Processing Errors along {pathName}:  {processingErrorMessage}.  Table was skipped for processing.", "", 0, fireAgain);
        }
    }

    string BlobColumnToString(BlobColumn blobColumn)
    {
        if (blobColumn.IsNull)
            return string.Empty;

        var blobLength = Convert.ToInt32(blobColumn.Length);
        var blobData = blobColumn.GetBlobData(0, blobLength);
        var stringData = System.Text.Encoding.Unicode.GetString(blobData);

        return stringData;
    }

    ConnectionManager CreateConnectionManager(Microsoft.SqlServer.Dts.Runtime.Package package, string conString, string conName, string conDescription)
    {
        ConnectionManager bldConnectionManager = package.Connections.Add("OLEDB");
        bldConnectionManager.ConnectionString = conString;
        bldConnectionManager.Name = conName;
        bldConnectionManager.Description = conDescription;

        bldConnectionManager.AcquireConnection(null);

        return bldConnectionManager;
    }

    static void ReInitializeConnectionManager(ConnectionManager conMgr, string conString)
    {
        conMgr.ConnectionString = conString;

        conMgr.AcquireConnection(null);
    }

    Executable CreateExecutable(Microsoft.SqlServer.Dts.Runtime.Package package, string exType)
    {
        Executable e = package.Executables.Add(exType);

        return e;
    }

    MainPipe CreateDataFlowTask(Executable executable, string dfName)
    {
        Microsoft.SqlServer.Dts.Runtime.TaskHost thMainPipe = executable as Microsoft.SqlServer.Dts.Runtime.TaskHost;
        thMainPipe.Name = dfName;
        MainPipe dataFlowTask = thMainPipe.InnerObject as MainPipe;
        (dataFlowTask as IDTSPipeline130).AutoAdjustBufferSize = true;

        return dataFlowTask;
    }

    IDTSComponentMetaData100 CreateOLEDBComponent(Microsoft.SqlServer.Dts.Runtime.Application app, MainPipe dataFlowTask, string componentName, bool createSource)
    {
        //Create the DataFlow Task
        IDTSComponentMetaData100 oleComponent = dataFlowTask.ComponentMetaDataCollection.New();
        oleComponent.Name = componentName;

        if (createSource == true)
        {
            oleComponent.ComponentClassID = app.PipelineComponentInfos["OLE DB Source"].CreationName;
        }
        else
        {
            oleComponent.ComponentClassID = app.PipelineComponentInfos["OLE DB Destination"].CreationName;
        }

        return oleComponent;
    }

    CManagedComponentWrapper CreateOLEDBSourceDesignTimeInstance(Microsoft.SqlServer.Dts.Runtime.Package package, IDTSComponentMetaData100 source, ConnectionManager sourceConnectionManager, string sourceSQL)
    {
        //Get the design-time instance of the component.
        CManagedComponentWrapper srcDesignTime = source.Instantiate();

        //Initialize the component
        srcDesignTime.ProvideComponentProperties();

        //Map the component to a connection manager
        source.RuntimeConnectionCollection[0].ConnectionManagerID = sourceConnectionManager.ID;
        source.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections[0]);

        //Set the OLE DB Source properties
        srcDesignTime.SetComponentProperty("AccessMode", 2);
        srcDesignTime.SetComponentProperty("SqlCommand", sourceSQL);

        // Reinitialize the metadata
        srcDesignTime.AcquireConnections(null);
        srcDesignTime.ReinitializeMetaData();
        srcDesignTime.ReleaseConnections();

        return srcDesignTime;
    }

    CManagedComponentWrapper CreateOLEDBDestinationDesignTimeInstance(Microsoft.SqlServer.Dts.Runtime.Package package, IDTSComponentMetaData100 destination, ConnectionManager destinationConnectionManager, string stagingAlias)
    {
        CManagedComponentWrapper destDesignTime = destination.Instantiate();
        destDesignTime.ProvideComponentProperties();

        destination.RuntimeConnectionCollection[0].ConnectionManagerID = destinationConnectionManager.ID;
        destination.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections[1]);

        destDesignTime.SetComponentProperty("AccessMode", 3);
        destDesignTime.SetComponentProperty("FastLoadOptions", "TABLOCK");
        destDesignTime.SetComponentProperty("OpenRowset", stagingAlias);

        return destDesignTime;
    }


    Microsoft.SqlServer.Dts.Runtime.TaskHost CreateExecuteSQLTask(Executable e, ConnectionManager connectionManager, string taskName, string sqlStatement)
    {
        Microsoft.SqlServer.Dts.Runtime.TaskHost thExecuteSQL = e as Microsoft.SqlServer.Dts.Runtime.TaskHost;
        thExecuteSQL.Properties["Connection"].SetValue(thExecuteSQL, connectionManager.ID);
        thExecuteSQL.Properties["Name"].SetValue(thExecuteSQL, taskName);
        thExecuteSQL.Properties["SqlStatementSource"].SetValue(thExecuteSQL, sqlStatement);

        return thExecuteSQL;
    }


    static void UpdateStagingMetaDataPostProcess(string connectionString, int tableId, int processedItems, string fromMaxTimeStampString)
    {
        string sqlCommand = $"DECLARE @procTime datetime = GETDATE(); " +
                            $"EXEC Staging.meta.spUpdateInsertsUpdatesDeletesAfterProcessing @tableID = {tableId.ToString()}, " +
                                                                                            $"@processedItems = {processedItems.ToString()}, " +
                                                                                            $"@processedTimeStamp = @procTime, " +
                                                                                            $"@fromMaxTimeStamp = '{fromMaxTimeStampString}';";

        using (SqlConnection connection = new SqlConnection(
            connectionString))
        {
            SqlCommand command = new SqlCommand(sqlCommand, connection);
            command.Connection.Open();
            command.ExecuteNonQuery();
        }

    }

    static void ExecuteStagingSQLCommand(string connectionString, string sqlCommand)
    {
        using (SqlConnection connection = new SqlConnection(
            connectionString))
        {
            SqlCommand command = new SqlCommand(sqlCommand, connection);
            command.Connection.Open();
            command.ExecuteNonQuery();
        }
    }


    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        //removed a bunch of code to fit into post on stack overflow...

        if (continueProcessing)
        {
            string serverName = this.Variables.ServerName;
            string stagingConnectionString = @"server=" + serverName + imNotGivingYouMyConnectionInformation;
            int processedItems = 0;

            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the deletesExecutable and insertsUpdatesExecutable variables and assign null for {stagingAlias}", string.Empty, 0, ref fireAgain);
            Executable deletesExecutable = null;
            Executable insertsUpdatesExecutable = null;

            //Create the Application and Package
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Application and Package", string.Empty, 0, ref fireAgain);
            Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
            Microsoft.SqlServer.Dts.Runtime.Package package = new Microsoft.SqlServer.Dts.Runtime.Package();


            Microsoft.SqlServer.Dts.Runtime.Connections pkgConns = package.Connections;

            //Setup the Source Connection Manager
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Setup Source Connection Manager for {stagingAlias}", string.Empty, 0, ref fireAgain);
            srcConMgr = CreateConnectionManager(package, connectionDetails,
                                            connectionName + " OLEDB Connection Manager",
                                            "Connection Manager for " + connectionName);
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Source Connection Manager for {stagingAlias}:  {package.Connections[$"{connectionName} OLEDB Connection Manager"].Description}", string.Empty, 0, ref fireAgain);


            //Setup the Destination Connection Manager
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Setup Destination Connection Manager", string.Empty, 0, ref fireAgain);
            dstConMgr = CreateConnectionManager(package, destinationConnectionDetails,
                                            "Staging OLEDB Connection Manager",
                                            "Connection Manager for Staging.");
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Destination Connection Manager for {stagingAlias}:  {package.Connections["Staging OLEDB Connection Manager"].Description}", string.Empty, 0, ref fireAgain);

            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Connection Manager Count for {stagingAlias}:  {package.Connections.Count}", string.Empty, 0, ref fireAgain);


            //Determine what kind of staging transaction this will be (Trunc and Load or Update/Insert and Delete)
            if (stagingClassification == "S" || this.Variables.ForceTruncateAndLoad == true) //this is a small, trunc and load table OR the user has elected to trunc and load everything that needs trunc'd and loaded yo
            {
                try
                {
                    processedItems = 14; //we're updating inserts, updates, and deletes

                    //we're going to trunc and load here
                    //Create the Load pipeline, a.k.a., DataFlow task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Trunc and Load pipeline, a.k.a., DataFlow task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Executable tl_e = CreateExecutable(package, "STOCK:PipelineTask");
                    MainPipe tl_dataFlowTask = CreateDataFlowTask(tl_e, "Trunc And Load");

                    //Set the IDTSComponentEvent handler to capture the details from any COMExceptions raised during package execution
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Set the IDTSComponentEvent handler to capture the details from any COMExceptions raised during package execution", string.Empty, 0, ref fireAgain);
                    ComponentEventHandler tl_events = new ComponentEventHandler();
                    tl_dataFlowTask.Events = DtsConvert.GetExtendedInterface(tl_events as IDTSComponentEvents);

                    ReInitializeConnectionManager(srcConMgr, connectionDetails);
                    //Create the OLEDB Source DataFlow Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the OLEDB Source DataFlow Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSComponentMetaData100 tl_source = CreateOLEDBComponent(app, tl_dataFlowTask, "OLEDBSource", true);
                    CManagedComponentWrapper tl_srcDesignTime = CreateOLEDBSourceDesignTimeInstance(package, tl_source, srcConMgr, tableSQL);


                    ReInitializeConnectionManager(dstConMgr, destinationConnectionDetails);
                    //Create the OLEDB destination DataFlow Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the OLEDB destination DataFlow Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSComponentMetaData100 tl_destination = CreateOLEDBComponent(app, tl_dataFlowTask, "OleDBDestination", false);
                    CManagedComponentWrapper tl_destDesignTime = CreateOLEDBDestinationDesignTimeInstance(package, tl_destination, dstConMgr, $"dbo.{stagingAlias}");


                    //Create the path between the two DataFlow Tasks
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the path between the two DataFlow Tasks for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSPath100 tl_path = tl_dataFlowTask.PathCollection.New();
                    tl_path.AttachPathAndPropagateNotifications(tl_source.OutputCollection[0], tl_destination.InputCollection[0]);


                    //Configure the Destination's Meta Data
                    //############################################################
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Get the destination's default input and virtual input
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Get the destination's default input and virtual input", string.Empty, 0, ref fireAgain);
                    IDTSInput100 tl_input = tl_destination.InputCollection[0];
                    IDTSVirtualInput100 tl_vInput = tl_input.GetVirtualInput();
                    IDTSVirtualInputColumnCollection100 tl_vInputColumns = tl_vInput.VirtualInputColumnCollection;

                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Initialize the destination dataflow
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Initialize the destination dataflow", string.Empty, 0, ref fireAgain);
                    tl_destDesignTime.AcquireConnections(null);
                    tl_destDesignTime.ReinitializeMetaData();
                    tl_destDesignTime.ReleaseConnections();

                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Iterate through the virtual input column collection and map to destination
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Iterate through the virtual input column collection and map to destination for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    foreach (IDTSVirtualInputColumn100 tl_vColumn in tl_vInputColumns)
                    {
                        var inputColumn = tl_destDesignTime.SetUsageType(tl_input.ID, tl_vInput, tl_vColumn.LineageID, DTSUsageType.UT_READONLY);
                        var externalColumn = tl_input.ExternalMetadataColumnCollection[inputColumn.Name];
                        tl_destDesignTime.MapInputColumn(tl_input.ID, inputColumn.ID, externalColumn.ID);
                    }
                    //############################################################


                    //Create the Truncate Execute SQL Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Truncation Execute SQL Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Executable trunc_e = CreateExecutable(package, "STOCK:SQLTask");
                    Microsoft.SqlServer.Dts.Runtime.TaskHost thTruncate = CreateExecuteSQLTask(trunc_e, dstConMgr, $"TRUNCATE {stagingAlias}", $"TRUNCATE TABLE dbo.{stagingAlias}");

                    //Create the Precedence Constraint between the Execute SQL Task and the Pipeline Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the precedence constraint between Execute SQL Task and DataFlow for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Microsoft.SqlServer.Dts.Runtime.PrecedenceConstraint tl_Constraint = package.PrecedenceConstraints.Add(trunc_e, tl_e);
                }
                catch (Exception tl_exc)
                {

                    ComponentMetaData.FireWarning(0, "Trunc And Load Package Creation Failure", $"{pathName}:  Trunc and Load Package Creation Failure for {stagingAlias} Custom Component Event Type:  {CustomComponentEvent.type}, Sub Component:  {CustomComponentEvent.subComponent}, Description:  {CustomComponentEvent.description}", string.Empty, 0);
                    ComponentMetaData.FireWarning(0, "Trunc And Load Package Creation Failure", $"{pathName}:  Trunc and Load Package Creation Failure for {stagingAlias} Error Code:  {tl_exc.HResult}, Error Message:  {tl_exc.Message}, Source Table SQL:  {tableSQL}, Source Connection Details:  {srcConMgr.ConnectionString}", string.Empty, 0);
                    continueProcessing = false;
                }


            }
            else
            {
                //removed a bunch of code for pasting into stack overlfow

非常感谢你的帮助!

4

0 回答 0