我创建了一个自定义 SSIS 目标组件来写入 MQ。我正在更改代码,因为我正在使用测试 SSIS 包来确认它是否有效。
我已经完成了将消息映射到输入列的所有工作。都好。但是我在 ProvideComponentProperties 中创建 ExternalMetadataColumnCollection。当我开始一个新的测试包时,我才意识到这是错误的调用它的地方。现在,当我尝试将新组件添加到包中时,出现错误
Error at Data Flow Task [SSIS.Pipeline]: The property is read-only.
Error at Data Flow Task [IBM MQ Destination Jo [6]]: System.Runtime.InteropServices.COMException (0xC0204013): Exception from HRESULT: 0xC0204013
at Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSExternalMetadataColumn100.set_ID(Int32 pID)
at IBMWebsphereMQ.DestinationAdapters.IbmMQDestinationAdapter.CreateInputAndMetaDataColumns (IDTSInput100 input)
at
IBMWebsphereMQ.DestinationAdapters.IbmMQDestinationAdapter.ProvideComponentProperties() 在 Microsoft.SqlServer.Dts.Pipeline.ManagedComponentHost.HostProvideComponentProperties(IDTSManagedComponentWrapper100 包装器)
显然,ProvideComponentProperties 不是创建 ExternalMetaData 列的正确位置。我应该在哪里调用它?
下面是组件的代码
using System;
using System.Collections;
using System.Runtime.InteropServices;
using System.Transactions;
using MyProject.IBMWebsphereMQ.ConnectionManagers;
using MyProject.IBMWebsphereMQ.Framework;
using MyProject.IBMWebsphereMQ.Interfaces;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using MyProject.IBMWebsphereMQ.MQ;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace MyProject.IBMWebsphereMQ.DestinationAdapters
{
[DtsPipelineComponent(ComponentType = ComponentType.SourceAdapter,
DisplayName = "IBM MQ Destination Jo",
Description = "Writes messages to the defined queue",
IconResource = "Dts.Pipeline.SSISSourceAdapter.source.ico",
CurrentVersion = 1
)]
public class IbmMQDestinationAdapter : PipelineComponent, IEnlistmentNotification
{
private const string QueuePropertyName = "Queue";
private const string BatchWriteSize = "BatchWriteSize";
private const string ConnectionName = "IBM MQ connection";
private const string MessageInputName = "Message";
private const string ColumnName = "Message";
private const string InputDescription = "Input for IbmMQDestinationAdapter";
private IManagedConnection connection;
private IQueue queue;
private ArrayList columnInfos = new ArrayList();
private bool reRaise = true;
private IDTSInput100 messageInput;
#region ColumnInfo
private struct ColumnInfo
{
public int BufferColumnIndex;
public string ColumnName;
}
#endregion
private string QueueName
{
get { return GetProperty<string>(QueuePropertyName); }
}
private int BatchSize
{
get { return GetProperty<int>(BatchWriteSize); }
}
public override void ProvideComponentProperties()
{
TraceLogging.WriteTrace(this, "Executing ProvideComponentProperties");
//base.ProvideComponentProperties();
////Clear out base implementation
//ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
//ComponentMetaData.InputCollection.RemoveAll();
//ComponentMetaData.OutputCollection.RemoveAll();
//Provide the inital components.
RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
ComponentMetaData.UsesDispositions = true;
SetComponentAttribute();
messageInput = ComponentMetaData.InputCollection.New();
messageInput.Name = MessageInputName;
messageInput.Description = InputDescription;
messageInput.HasSideEffects = true;
messageInput.ExternalMetadataColumnCollection.IsUsed = true;
TraceLogging.WriteTrace(this, "Creating InputColCollection");
IDTSInputColumn100 inCol = messageInput.InputColumnCollection.New();
TraceLogging.WriteTrace(this, "Setting incol.Name {0}" , ColumnName);
inCol.Name = ColumnName;
CreateInputAndMetaDataColumns(messageInput);
DtsExtensions.CreateCustomProperty(ComponentMetaData.CustomPropertyCollection.New(), QueuePropertyName, "The name of the queue to connect to", true, "QueueName");
//DtsExtensions.CreateCustomProperty(ComponentMetaData.CustomPropertyCollection.New(), BatchWriteSize, "Maximum number of messages to dequeue(0=all)", true, 5000);
//Reserve space for the connection manager
IDTSRuntimeConnection100 connectionSite = ComponentMetaData.RuntimeConnectionCollection.New();
connectionSite.Name = ConnectionName;
}
public override DTSValidationStatus Validate()
{
bool pbCancel = false;
if (ComponentMetaData.OutputCollection.Count != 0)
{
ComponentMetaData.FireError(0, ComponentMetaData.Name, "Unexpected Output found. Destination components do not support outputs",
"", 0, out pbCancel);
return DTSValidationStatus.VS_ISCORRUPT;
}
if (ComponentMetaData.AreInputColumnsValid == false)
{
ComponentMetaData.InputCollection["ComponentInput"].InputColumnCollection.RemoveAll();
return DTSValidationStatus.VS_NEEDSNEWMETADATA;
}
////What about if we have input columns but we have no ExternalMetaData
////columns? Maybe somebody removed them through code.
//IDTSInput100 input = ComponentMetaData.InputCollection[MessageInputName];
//if (DoesEachInputColumnHaveAMetaDataColumnAndDoDatatypesMatch(input.ID) ==false)
//{
// ComponentMetaData.FireError(0, "Validate", "input columns and metadata columns are out of sync. Making call to ReinitializeMetaData", "", 0, out pbCancel);
// return DTSValidationStatus.VS_NEEDSNEWMETADATA;
//}
return base.Validate();
}
private bool DoesEachInputColumnHaveAMetaDataColumnAndDoDatatypesMatch(int inputID)
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
IDTSExternalMetadataColumn100 mdc;
bool rtnVal = true;
foreach (IDTSInputColumn100 col in input.InputColumnCollection)
{
if (col.ExternalMetadataColumnID == 0)
{
rtnVal = false;
}
else
{
mdc =
input.ExternalMetadataColumnCollection[col.ExternalMetadataColumnID];
if (mdc.DataType != col.DataType || mdc.Length != col.Length ||
mdc.Precision != col.Precision || mdc.Scale != col.Scale || mdc.CodePage !=
col.CodePage)
{
rtnVal = false;
}
}
}
return rtnVal;
}
public override void OnInputPathAttached(int inputID)
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
IDTSVirtualInput100 vInput = input.GetVirtualInput();
foreach (IDTSVirtualInputColumn100 vCol in vInput.VirtualInputColumnCollection)
{
this.SetUsageType(inputID, vInput, vCol.LineageID, DTSUsageType.UT_READONLY);
}
}
public override void PreExecute()
{
TraceLogging.WriteTrace(this, "PreExecute");
IDTSInput100 input = ComponentMetaData.InputCollection[MessageInputName];
// CreateInputAndMetaDataColumns(input );
TraceLogging.WriteTrace(this, "PreExecute after getting Inputcollection");
ComponentMetaData.FireInformation(0, "PreExecute about to loop", "Start loop of columns", null, 0, ref reRaise);
TraceLogging.WriteTrace(this, "PreExecute Loop");
foreach (IDTSInputColumn100 inCol in input.InputColumnCollection)
{
ColumnInfo ci = new ColumnInfo();
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(input.Buffer, inCol.LineageID);
ci.ColumnName = inCol.Name;
TraceLogging.WriteTrace(this, " PreExecute columnInfo Name: {0}", inCol.Name);
columnInfos.Add(ci);
}
}
public override void ReinitializeMetaData()
{
IDTSInput100 _profinput = ComponentMetaData.InputCollection[MessageInputName];
if (_profinput.ExternalMetadataColumnCollection.Count > 0)
{
_profinput.ExternalMetadataColumnCollection.RemoveAll();
}
if (_profinput.InputColumnCollection.Count > 0)
{
_profinput.InputColumnCollection.RemoveAll();
}
CreateMetaDataColumns(_profinput);
}
private void CreateMetaDataColumns(IDTSInput100 input)
{
TraceLogging.WriteTrace(this, "CreateMetaDataColumns");
IDTSExternalMetadataColumnCollection100 extCols = input.ExternalMetadataColumnCollection;
TraceLogging.WriteTrace(this, "Got extCols");
//IDTSInputColumn100 inCol = messageInput.InputColumnCollection.New();
//inCol.Name = ColumnName;
foreach (IDTSInputColumn100 inCol in input.InputColumnCollection)
{
TraceLogging.WriteTrace(this, "For Each ColumnName = {0}", inCol.Name);
if (inCol.Name == ColumnName)
{
TraceLogging.WriteTrace(this, "Create new ExtCol");
IDTSExternalMetadataColumn100 extCol = extCols.New();
TraceLogging.WriteTrace(this, "Set them to input col");
extCol.Name = inCol.Name;
extCol.ID = inCol.ID;
extCol.DataType = inCol.DataType;
extCol.Length = inCol.Length;
extCol.Precision = inCol.Precision;
extCol.Scale = inCol.Scale;
extCol.CodePage = inCol.CodePage;
}
}
}
public override void AcquireConnections(object transaction)
{
if (transaction != null && connection.Configuration.AcknowledgementMode == Acknowledgement.Session)
{
ComponentMetaData.FireInformation(0, "AcquireConnections()", "Enlist in transaction", null, 0,
ref reRaise);
//If the connection mode is session and there is a valid transaction then enlist.
IntPtr pUnk = Marshal.GetIUnknownForObject(transaction);
var dtcTrans = (IDtcTransaction)Marshal.GetTypedObjectForIUnknown(pUnk, typeof(IDtcTransaction));
Transaction managedTrans = TransactionInterop.GetTransactionFromDtcTransaction(dtcTrans);
managedTrans.EnlistVolatile(this, EnlistmentOptions.EnlistDuringPrepareRequired);
}
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
connection = (IManagedConnection)conn.ConnectionManager.AcquireConnection(transaction);
if (connection is IbmMQConnection)
{
ComponentMetaData.FireInformation(0, "AcquireConnections()", "Connecting", null, 0, ref reRaise);
//Connect if not connected
if (!connection.IsConnected())
connection.Connect();
if (connection.IsConnected())
{
ComponentMetaData.FireInformation(0, "AcquireConnections()", "Connected.", null, 0, ref reRaise);
ComponentMetaData.FireInformation(0, "AcquireConnections()", "Connecting to queue", null, 0, ref reRaise);
queue = new IbmMQQueue(connection) { QueueName = QueueName };
queue.Open();
}
}
}
public override void ReleaseConnections()
{
if (queue != null)
queue.Close();
base.ReleaseConnections();
}
//public void CreateExternalMetaDataColumn(IDTSInput100 input, int inputColumnID)
//{
// IDTSInputColumn100 oColumn = input.InputColumnCollection.GetObjectByID(inputColumnID);
// IDTSExternalMetadataColumn100 eColumn = input.ExternalMetadataColumnCollection.New();
// eColumn.DataType = oColumn.DataType;
// eColumn.Precision = oColumn.Precision;
// eColumn.Scale = oColumn.Scale;
// eColumn.Length = oColumn.Length;
// eColumn.CodePage = oColumn.CodePage;
// oColumn.ExternalMetadataColumnID = eColumn.ID;
//}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
ComponentMetaData.FireInformation(0, "ProcessInput Jo", "Beginning Process Input", null, 0, ref reRaise);
if (!buffer.EndOfRowset)
{
while (buffer.NextRow())
{
ComponentMetaData.FireInformation(0, "ProcessInput()", "Beginning loop of columnns", null, 0,
ref reRaise);
TraceLogging.WriteTrace(this, "Process Input columninfos,count = {0}", columnInfos.Count);
var message = new Message();
for (int i = 0; i < columnInfos.Count; i++)
{
ColumnInfo ci = (ColumnInfo) columnInfos[i];
object o = buffer[ci.BufferColumnIndex];
if (o == null)
{
TraceLogging.WriteTrace(this, "o is null");
TraceLogging.WriteTrace(this, "ColumnName: {0} ", ci.ColumnName);
}
else
{
TraceLogging.WriteTrace(this, "testing trace {0}: ", "Jo");
TraceLogging.WriteTrace(this, "ColumnName: {0}", ci.ColumnName);
TraceLogging.WriteTrace(this, "Column Value: {0}", buffer[ci.BufferColumnIndex].ToString());
if (ci.ColumnName == ColumnName)
{
TraceLogging.WriteTrace(this, "Setting MessageContents");
message.Contents = buffer[ci.BufferColumnIndex].ToString();
}
else
{
TraceLogging.WriteTrace(this, "Setting MessageId");
message.MessageId = buffer[ci.BufferColumnIndex].ToString();
}
}
}
queue.Write(message);
//string messageContents = buffer.GetString(m_BlobColumnIndex);
//TraceLogging.WriteTrace(this, "Message contents", messageContents);
//var message = new Message("1", messageContents);
//ComponentMetaData.FireInformation(0, "ProcessInput()", "Beginning queue write set contents", null, 0,
// ref reRaise);
//queue.Write(message);
ComponentMetaData.FireInformation(0, "ProcessInput()", "after queue write", null, 0, ref reRaise);
connection.Commit();
}
}
}
private void SetComponentAttribute()
{
var componentAttribute =
(DtsPipelineComponentAttribute)
Attribute.GetCustomAttribute(GetType(), typeof(DtsPipelineComponentAttribute), false);
int currentVersion = componentAttribute.CurrentVersion;
ComponentMetaData.Version = currentVersion;
ComponentMetaData.ContactInfo = ContactInfo;
}
private T GetProperty<T>(string propertyName)
{
foreach (IDTSCustomProperty100 possibleProperty in ComponentMetaData.CustomPropertyCollection)
{
if (possibleProperty.Name == propertyName)
{
return (T)possibleProperty.Value;
}
}
return default(T);
}
#region IEnlistmentNotification Members
public void Commit(Enlistment enlistment)
{
ComponentMetaData.FireInformation(0, "Commit(Enlistment enlistment)", "Beginning commit", null, 0, ref reRaise);
connection.Commit();
ComponentMetaData.FireProgress("Committed", 100, 0, 0, "IbmMQSourceAdapter", ref reRaise);
enlistment.Done();
}
public void InDoubt(Enlistment enlistment)
{
}
public void Prepare(PreparingEnlistment preparingEnlistment)
{
if (connection != null)
{
preparingEnlistment.Prepared();
// TraceLogging.WriteTrace(this, "Transaction Preparing");
}
}
public void Rollback(Enlistment enlistment)
{
connection.Rollback();
// TraceLogging.WriteTrace(this, "Rollback Executed");
ComponentMetaData.FireWarning(0, "Rollback(Enlistment enlistment)", "Rolling back transaction", null, 0);
enlistment.Done();
}
#endregion
}
}