我想通过一些特定的活动从工作流访问服务总线队列和主题。
我找不到适合这种情况的任何东西(这篇 MSDN 文章和Roman Kiss 的这篇文章)是最接近的。
我想设计一个自定义活动,它使用 QueueClient 异步接收代理消息,使用通过 async/await 模式实现的 BeginReceive 方法(请参阅我的问题)。
首先,我想问一下是否有任何理由让我更喜欢建议的方法(改编的 WCF)而不是我想要的方法(使用 QueueClient)。
然后,我将不胜感激以一种持久友好的方式设计它。
更新:
这是我到目前为止所尝试的:
public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
[RequiredArgument]
public InArgument<string> ConnectionString { get; set; }
[RequiredArgument]
public InArgument<string> Path { get; set; }
protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
{
var connectionString = this.ConnectionString.Get(context);
var path = this.Path.Get(context);
var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
var cts = new CancellationTokenSource();
context.UserState = new ReceiveState
{
CancellationTokenSource = cts,
QueueClient = queueClient
};
var task = ExecuteAsync(context, cts.Token);
var tcs = new TaskCompletionSource<BrokeredMessage>(state);
task.ContinueWith(
t =>
{
if (t.IsFaulted)
{
tcs.TrySetException(t.Exception.InnerExceptions);
}
else if (t.IsCanceled)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(t.Result);
}
if (callback != null)
{
callback(tcs.Task);
}
});
return tcs.Task;
}
protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{
var task = (Task<BrokeredMessage>)result;
try
{
return task.Result;
}
catch (OperationCanceledException)
{
if (context.IsCancellationRequested)
{
context.MarkCanceled();
}
else
{
throw;
}
return null; // or throw?
}
catch (AggregateException exception)
{
if (exception.InnerException is OperationCanceledException)
{
if (context.IsCancellationRequested)
{
context.MarkCanceled();
}
else
{
throw;
}
return null; // or throw?
}
ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
throw;
}
}
protected override void Cancel(AsyncCodeActivityContext context)
{
var state = (ReceiveState)context.UserState;
state.CancellationTokenSource.Cancel();
}
private async Task<BrokeredMessage> ExecuteAsync(
AsyncCodeActivityContext context, CancellationToken cancellationToken)
{
var receiveState = context.UserState as ReceiveState;
var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
var completionTask = receiveTask.ContinueWith(
t =>
{
BrokeredMessage result;
if (t.IsCanceled)
{
context.MarkCanceled();
result = null;
}
else if (t.IsFaulted)
{
result = null;
}
else
{
t.Result.Complete();
result = t.Result;
}
receiveState.QueueClient.Close();
return result;
},
cancellationToken);
return await completionTask;
}
private class ReceiveState
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public QueueClient QueueClient { get; set; }
}
}
并以这种方式测试(使用本地 Windows Server 服务总线):
var connectionString = new Variable<string>
{
Default = connectionStringValue
};
var path = new Variable<string>
{
Default = pathValue
};
var test = new While
{
Body =
new Pick
{
Branches =
{
new PickBranch
{
Trigger =
new AsyncReceiveBrokeredMessage
{
ConnectionString = new InArgument<string>(connectionString),
Path = new InArgument<string>(path)
},
Action =
new WriteLine
{
Text =
"Received message"
}
},
new PickBranch
{
Trigger =
new Delay
{
Duration = TimeSpan.FromSeconds(10)
},
Action =
new WriteLine
{
Text =
"Timeout!"
}
}
}
},
Condition = true,
Variables = { connectionString, path }
};
WorkflowInvoker.Invoke(test);
如果我连续发送消息,我会按预期收到消息。第一次超时后出现问题,因为那时我不再收到任何消息。任何澄清表示赞赏。