0

我正在尝试将 Microsoft“发布”的 CloudFx 库与一组示例一起使用。关于该库以及应该如何使用它的文档非常少(除了服务总线),但我正在尝试使用 CHM 参考和示例。我试图复制他们在通过存储队列进行通信的简单生产者/消费者工作者角色中所做的事情,但我似乎无法让 CloudQueueListenerExtension 类按我希望的方式工作。首先,我围绕监听器编写了一个简单的包装类:

public class QueueListener<T>
{
    private readonly CloudQueueLocation _queueLocation;
    private readonly CloudQueueListenerExtension<T> _queueListenerExtension;
    private readonly IObserver<T> _observer;

    public static QueueListener<T> StartNew(IExtensibleComponent owner, string storageAccount, Action<T> action)
    {
        var location = new CloudQueueLocation()
                            {
                                StorageAccount = storageAccount,
                                QueueName = typeof(T).Name.ToLowerInvariant()
                            };
        return new QueueListener<T>(owner, location, action).Start();
    }

    protected QueueListener(IExtensibleComponent owner, CloudQueueLocation queueLocation, Action<T> action)
    {
        _queueLocation = queueLocation;
        _queueListenerExtension = new CloudQueueListenerExtension<T>(queueLocation, owner);
        _observer = Observer.Create(action);

        _queueListenerExtension.Subscribe(_observer);            
    }

    protected QueueListener<T> Start()
    {
        _queueListenerExtension.StartListener();
        return this;
    }
}

然后我将它设置为主要工作角色,如下所示:

QueueListener<MyMessageType>.StartNew(this, storageAccountString,
         newMsg => _log.InfoFormat("Got {0}", newMsg));

我有一个将 MyMessageType 消息发布到队列的 Web 应用程序,但该操作从未执行。我在诊断日志中看到一些痕迹,表明侦听器指向了正确的存储帐户和正确的队列,我什至看到了对ReliableCloudQueueStorage.Get. 我无法构建示例,但我认为我正在以与示例完全相同的方式使用侦听器扩展。

关于可能发生什么的任何想法?

4

1 回答 1

1

感谢您的精彩解释和代码片段。这对于故障排除非常有用。:-)

我们最近将 CloudFx 迁移到 Reactive Extensions 2.0,在我们进行迁移时,我们未能了解我们如何在队列侦听器组件内部处理可观察序列的一些细节。现在已经解决了这个问题,我刚刚推出了一个新版本的 NuGet 包 (1.3.0.1),旨在解决您面临的挑战。

让我们知道您的进展情况。

瓦莱里

于 2012-08-30T03:47:31.083 回答