在这里回答我自己的问题。
设想了两种可能的方法。
1. 使用脚本组件(在数据流步骤中)
在脚本组件中,我能够SET ...
直接通过 ADO.NET 发出命令。
这种方法的问题是我无法为后续(或并行,在同一数据流中)ADO.NET 源组件保留运行这些命令的连接。
尝试通过事务通过特定连接工作是不好的,因为这些命令必须在正在进行的事务之外发出。
所以最终我还必须从这个组件中发出源选择,即使这样也不太理想,因为随后的目标插入操作可能不会与源选择在同一个事务中登记。
使用这种方法的解决方案最终是: - 使用 MVCC,将数据从源视图复制到源系统上的临时临时表中。- 然后使用事务,从源临时表读取到目标系统。
代码看起来像这样(NB 必须显式添加对 Ingres .NET Data Provider\v2.1\Ingres.Client.dll 的引用
/* Microsoft SQL Server Integration Services Script Component
* Write scripts using Microsoft Visual C# 2008.
* ScriptMain is the entry point class of the script.*/
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Ingres.Client;
using System.Collections.Generic;
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
private bool debug = true;
private IDTSConnectionManager100 cm;
private IngresConnection conn;
public override void AcquireConnections(object Transaction)
{
// The connection manager used here must be configured in the Script Component editor's "Connection Managers" page.
// "Connection" is the (default) strongly typed name for the first connection added.
// In this case, it needs to be a reference to the xxxxx connection manager (by convention it should be "xxxxx_ADONET").
cm = this.Connections.Connection;
conn = (IngresConnection)cm.AcquireConnection(Transaction);
}
public override void PreExecute()
{
debugMessage("PreExecute", "Started");
base.PreExecute();
string viewName = Variables.vViewName;
IngresCommand cmdSetSessionLockMode = conn.CreateCommand();
IngresCommand cmdSetSessionIsolationLevel = conn.CreateCommand();
IngresCommand cmdReaderQuery = conn.CreateCommand();
List<string> sqlCommandStrings = new List<string>();
if (Variables.vUseIngresMVCC)
{
sqlCommandStrings.Add("SET LOCKMODE SESSION WHERE LEVEL = MVCC");
}
sqlCommandStrings.Add("SET SESSION ISOLATION LEVEL READ COMMITTED");
sqlCommandStrings.Add(String.Format("MODIFY {0}_STAGING TO TRUNCATED", viewName));
sqlCommandStrings.Add(String.Format("INSERT INTO {0}_STAGING SELECT * FROM {0}", viewName));
foreach (string sqlCommandString in sqlCommandStrings)
{
debugMessage("PreExecute", "Executing: '{0}'", sqlCommandString);
IngresCommand command = conn.CreateCommand();
command.CommandText = sqlCommandString;
int rowsAffected = command.ExecuteNonQuery();
string rowsAffectedString = rowsAffected >= 0 ? rowsAffected.ToString() : "No";
debugMessage("PreExecute", "Command executed OK, {0} rows affected.", rowsAffectedString);
}
debugMessage("PreExecute", "Finished");
}
public override void CreateNewOutputRows()
{
// SSIS requires that we output at least one row from this source script.
Output0Buffer.AddRow();
Output0Buffer.CompletedOK = true;
}
public override void PostExecute()
{
base.PostExecute();
// NB While it is "best practice" to release the connection here, doing so with an Ingres connection will cause a COM exception.
// This exception kills the SSIS BIDS designer such that you'll be unable to edit this code through that tool.
// Re-enable the following line at your own peril.
//cm.ReleaseConnection(conn);
}
private void debugMessage(string method, string messageFormat, params object[] messageArgs)
{
if (this.debug)
{
string message = string.Format(messageFormat, messageArgs);
string description = string.Format("{0}: {1}", method, message);
bool fireAgain = true;
this.ComponentMetaData.FireInformation(0, this.ComponentMetaData.Name, description, "", 0, ref fireAgain);
}
}
}