这是我的解决方案。不是同步调用包,而是异步调用,并在调用代码中使用 ManualResetEvent 来阻塞主线程,直到包完成。
namespace Anthony.Staging.Integration.Tests
{
using System;
using System.Data;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Microsoft.SqlServer.Management.Common;
using Microsoft.VisualStudio.TestTools.UnitTesting;
//Add references via %WINDIR%\assembly\GAC_MSIL\:
/* Microsoft.SqlServer.ConnectionInfo
* Microsoft.SqlServer.Management.InterationServices
* Microsoft.SqlServer.Management.Sdk.Sfc
* Microsoft.SqlServer.Smo
*/
using Microsoft.SqlServer.Management.IntegrationServices;
[TestClass]
public class ConnectionTests
{
private const string connectionString = @"Data Source=Anthony\MSSQL11;Initial Catalog=SSISDB;Integrated Security=SSPI;";
private static SqlConnection connection;
private static ServerConnection conn;
private static IntegrationServices ssis;
private static string catalog = "SSISDB";
private static string folder = "Anthony.Integration";
[TestInitialize]
public void MyTestInitialize()
{
connection = new SqlConnection(connectionString);
ssis = new IntegrationServices(connection);
}
[TestMethod]
public void CanConnect()
{
Assert.IsNotNull(connection);
Assert.IsTrue(connection.State == ConnectionState.Open);
}
[TestMethod]
public void CanGetKnownPackage()
{
try
{
var package = this.GetPackage(catalog, folder, "Anthony.Staging", "_Extract_Full.dtsx");
Assert.IsNotNull(package);
}
catch (Exception ex)
{
Assert.Fail(ex.Message);
}
}
[TestMethod]
public void CanExecuteKnownPackage()
{
const int MAX_RETRIES = 20;
const int RETRY_INTERVAL_SECONDS = 10;
const int OPERATION_TIMEOUT_MINUTES = 10;
// get the package from the SSISCatalog and start it asynchronously
// because starting it synchronously will time out after 30 seconds.
var package = this.GetPackage(catalog, folder, "Anthony.Staging", "_Extract_Full.dtsx");
var executionIdentifier = package.Execute(false, null);
// block the main thread and kick off a timer immediately which checks execution status for an ssis execution identifier.
var mre = new ManualResetEvent(false);
var statusChecker = new StatusChecker(executionIdentifier, MAX_RETRIES);
var timer = new Timer(statusChecker.CheckStatus, new StatusCheckerState(mre), new TimeSpan(0, 0, 0), new TimeSpan(0, 0, RETRY_INTERVAL_SECONDS));
WaitHandle.WaitAny(new WaitHandle[] { mre }, (int)new TimeSpan(0, OPERATION_TIMEOUT_MINUTES, 0).TotalMilliseconds, false);
mre.Dispose();
timer.Dispose();
// get the results
var execution = ssis.Catalogs.Single(x => x.Name.Equals(catalog)).Executions.Single(x => x.Id.Equals(executionIdentifier));
var errors = execution.Messages.Where(m => m.MessageType == 120).Select(m => m.Message);
var warnings = execution.Messages.Where(m => m.MessageType == 110).Select(m => m.Message);
Assert.AreEqual(0, errors.Count());
Assert.AreEqual(Operation.ServerOperationStatus.Success, execution.Status);
}
class StatusCheckerState
{
public StatusCheckerState(ManualResetEvent waitHandle)
{
this.WaitHandle = waitHandle;
}
public ManualResetEvent WaitHandle { get; private set; }
}
class StatusChecker
{
private readonly long executionIdentifier;
private int invokeCount;
private ManualResetEvent waitHandle;
private readonly int maximumCount;
public StatusChecker(long executionIdentifier, int maxCount)
{
this.executionIdentifier = executionIdentifier;
invokeCount = 0;
maximumCount = maxCount;
}
// This method is called by the timer delegate.
public void CheckStatus(object state)
{
var localState = ((StatusCheckerState)state);
this.waitHandle = localState.WaitHandle;
if (invokeCount > 0)
{
Debug.WriteLine("Retry attempt: {0}", invokeCount);
}
invokeCount++;
if (invokeCount == maximumCount)
{
// Reset the counter and signal Main.
invokeCount = 0;
waitHandle.Set();
}
var execution = new IntegrationServices(connection).Catalogs.Single(x => x.Name.Equals(catalog)).Executions.Single(x => x.Id.Equals(executionIdentifier));
Debug.WriteLine("Status of execution " + executionIdentifier + " is " + execution.Status);
if (execution.Status == Operation.ServerOperationStatus.Success)
{
// Reset the counter and signal Main.
invokeCount = 0;
waitHandle.Set();
}
}
}
[TestCleanup]
public void MyTestCleanup()
{
if(connection.State == ConnectionState.Open)
connection.Close();
ssis = null;
}
private PackageInfo GetPackage(string catalog, string folder, string project, string package)
{
if (ssis.Catalogs.Contains(catalog))
{
var cat = ssis.Catalogs[catalog];
if (cat.Folders.Contains(folder))
{
var fold = cat.Folders[folder];
if (fold.Projects.Contains(project))
{
var proj = fold.Projects[project];
if (proj.Packages.Contains(package))
{
var pkg = proj.Packages[package];
return pkg;
}
}
}
}
throw new Exception("Cannot find package!");
}
}
}