19

我想通过一些特定的活动从工作流访问服务总线队列和主题。

我找不到适合这种情况的任何东西(这篇 MSDN 文章Roman Kiss 的这篇文章)是最接近的。

我想设计一个自定义活动,它使用 QueueClient 异步接收代理消息,使用通过 async/await 模式实现的 BeginReceive 方法(请参阅我的问题)。

首先,我想问一下是否有任何理由让我更喜欢建议的方法(改编的 WCF)而不是我想要的方法(使用 QueueClient)。

然后,我将不胜感激以一种持久友好的方式设计它。

更新:

这是我到目前为止所尝试的:

public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
    [RequiredArgument]
    public InArgument<string> ConnectionString { get; set; }

    [RequiredArgument]
    public InArgument<string> Path { get; set; }

    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var connectionString = this.ConnectionString.Get(context);
        var path = this.Path.Get(context);
        var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
        var cts = new CancellationTokenSource();
        context.UserState = new ReceiveState
                                {
                                    CancellationTokenSource = cts,
                                    QueueClient = queueClient
                                };
        var task = ExecuteAsync(context, cts.Token);
        var tcs = new TaskCompletionSource<BrokeredMessage>(state);
        task.ContinueWith(
            t =>
                {
                    if (t.IsFaulted)
                    {
                        tcs.TrySetException(t.Exception.InnerExceptions);
                    }
                    else if (t.IsCanceled)
                    {
                        tcs.TrySetCanceled();
                    }
                    else
                    {
                        tcs.TrySetResult(t.Result);
                    }

                    if (callback != null)
                    {
                        callback(tcs.Task);
                    }
                });

        return tcs.Task;
    }

    protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        var task = (Task<BrokeredMessage>)result;
        try
        {
            return task.Result;
        }
        catch (OperationCanceledException)
        {
            if (context.IsCancellationRequested)
            {
                context.MarkCanceled();
            }
            else
            {
                throw;
            }

            return null; // or throw?
        }
        catch (AggregateException exception)
        {
            if (exception.InnerException is OperationCanceledException)
            {
                if (context.IsCancellationRequested)
                {
                    context.MarkCanceled();
                }
                else
                {
                    throw;
                }

                return null; // or throw?
            }

            ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
            throw;
        }
    }

    protected override void Cancel(AsyncCodeActivityContext context)
    {
        var state = (ReceiveState)context.UserState;
        state.CancellationTokenSource.Cancel();
    }

    private async Task<BrokeredMessage> ExecuteAsync(
        AsyncCodeActivityContext context, CancellationToken cancellationToken)
    {
        var receiveState = context.UserState as ReceiveState;
        var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
            receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
        var completionTask = receiveTask.ContinueWith(
             t =>
                 {
                     BrokeredMessage result;
                     if (t.IsCanceled)
                     {
                         context.MarkCanceled();
                         result = null;
                     }
                     else if (t.IsFaulted)
                     {
                         result = null;
                     }
                     else
                     {

                         t.Result.Complete();
                         result = t.Result;
                     }

                     receiveState.QueueClient.Close();
                     return result;
                 },
             cancellationToken);
        return await completionTask;
    }

    private class ReceiveState
    {
        public CancellationTokenSource CancellationTokenSource { get; set; }

        public QueueClient QueueClient { get; set; }
    }
}

并以这种方式测试(使用本地 Windows Server 服务总线):

var connectionString = new Variable<string>
                                   {
                                       Default = connectionStringValue
                                   };
        var path = new Variable<string>
                       {
                           Default = pathValue
                       };
        var test = new While
                       {
                           Body =
                               new Pick
                                   {
                                       Branches =
                                           {
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new AsyncReceiveBrokeredMessage
                                                               {
                                                                   ConnectionString = new InArgument<string>(connectionString),
                                                                   Path = new InArgument<string>(path)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Received message"
                                                               }
                                                   },
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new Delay
                                                               {
                                                                   Duration = TimeSpan.FromSeconds(10)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Timeout!"
                                                               }
                                                   }
                                           }
                                   },
                           Condition = true,
                           Variables = { connectionString, path }
                       };
        WorkflowInvoker.Invoke(test);

如果我连续发送消息,我会按预期收到消息。第一次超时后出现问题,因为那时我不再收到任何消息。任何澄清表示赞赏。

4

3 回答 3

1

首先你需要知道一些重要的事情: 1) 工作流是长时间运行的过程,意味着以后可以暂停和恢复。2) 工作流被唤醒和恢复的方式是书签。3) 通常人们喜欢他们的工作流在暂停的同时也能持久化。(如果您不关心持久性,为什么还要使用 WF - 只是为了视觉设计工具?)

逻辑问题:

如果您的所有工作流及其活动都被持久化和暂停,那么您的活动代码甚至都没有加载,那么谁在监听呢?回答:其他东西,而不是活动,必须是在 ServiceBus 队列上侦听并负责恢复书签以唤醒您的工作流的东西。

那就是工作流“主机”,或者它的某种扩展。这里有几篇关于如何自定义主机以侦听消息 [从 GUI 按钮] 并唤醒工作流活动的博客文章。

http://blogs.msdn.com/b/tilovell/archive/2011/02/26/wf4-workflow-4-0-hosting-extensions-redux.aspx

http://blogs.msdn.com/b/tilovell/archive/2010/06/08/wf4-workflow-4-0-hosting-extensions.aspx

您可以做的是获取此代码并将其调整为侦听 ServiceBus 队列而不是 GUI 按钮,并唤醒您自己的 ReceiveFromServiceBus 活动,这类似于 PageActivity - 请注意,您必须编写 NativeActivity 才能使用书签正确。

一切都相当麻烦......但我相信用 WF做这件事的“正确”方法。

于 2014-12-06T07:30:12.020 回答
0

队列实体提供以下功能:“指定将消息添加到队列的时间的能力。”

由于这条规则,在一些超时后您可能不会收到?

可能的解决方案是:

检测入站消息重复,允许客户端多次发送相同的消息而不会产生不良后果。

于 2014-01-14T14:25:17.463 回答
0

问题可能出在 DefaultMessageTimeToLive 或 TimeToLive 属性中。

NamespaceManager.CreateSubscription(
        new SubscriptionDescription(TopicName, SubscriptionName)
            {
                LockDuration = TimeSpan.FromMinutes(5),
                DefaultMessageTimeToLive = TimeSpan.FromDays(7),
                EnableDeadLetteringOnMessageExpiration = true
            });
于 2014-01-14T14:28:20.807 回答