4

我正在使用服务总线连接 Web 角色和辅助角色。我的工作角色处于连续循环中,我正在使用 QueueClient.Receive() 方法接收 Web 角色发送的消息。

但是使用这种方法,如果服务总线队列上没有消息,它会等待几秒钟来接收消息,而不是移动到下一行继续执行。我希望有一些异步方法来接收消息?或者至少以某种方式设置这个等待时间?

我从 QueueClient 的 msdn 文档中找到了这个 BeginReceive 方法 我希望这将是我问题的答案,但我不知道如何使用这个方法。方法参数是异步回调和对象状态,我不知道它们是什么。

有任何想法吗?

更新: 感谢 Sandrino 的出色解决方案,它可以异步工作。但是异步现在给我带来了一些问题。我的 VS 崩溃了。不确定是什么问题。下面是正在使用的代码。

工人角色:

public override void Run()
    {
while (!IsStopped)
        {                
                // Receive the message from Web Role to upload the broadcast to queue
                BroadcastClient.BeginReceive(OnWebRoleMessageReceived, null);                    

                // Receive the message from SignalR BroadcastHub
                SignalRClient.BeginReceive(OnSignalRMessageReceived, null);                    

            }
}


public void OnWebRoleMessageReceived(IAsyncResult iar)
    {
        BrokeredMessage receivedBroadcastMessage = null;
        receivedBroadcastMessage = BroadcastClient.EndReceive(iar);

        if (receivedBroadcastMessage != null)
        {   //process message
           receivedBroadcastMessage.Complete();
        }
    }

public void OnSignalRMessageReceived(IAsyncResult iar)
    {
        BrokeredMessage receivedSignalRMessage = null;
        receivedSignalRMessage = SignalRClient.EndReceive(iar);

        if (receivedSignalRMessage != null)
        {
            //process message
           receivedSignalRMessage.Complete();

           WorkerRoleClient.Send(signalRMessage);
        }
     }

我是否错过了使 VS 过度工作和崩溃的任何事情?因为在转移到 BeginReceive 之前,当我使用 QueueClient.Receive 时它工作正常并且没有崩溃。

谢谢

4

3 回答 3

6

BeginReceive方法是您的情况的方法。你通常会这样称呼它:

void SomeMethod() 
{
     ...
     client.BeginReceive(TimeSpan.FromMinutes(5), OnMessageReceived, null);
     ...
}

void OnMessageReceived(IAsyncResult iar)
{
     var msg = client.EndReceive(iar);
     if (msg != null)
     {
         var body = msg.GetBody<MyMessageType>();
         ...
     }
}
于 2013-02-13T12:05:48.170 回答
1

我就是这样做的(扩展 Sandrino De Mattia 的解决方案):

void SomeMethod() 
{
    ...
    client.BeginReceive(TimeSpan.FromSeconds(5), OnMessageReceived, null);
    ...
}

void OnMessageReceived(IAsyncResult iar)
{
    if(!IsStopped)
    {
        var msg = client.EndReceive(iar);
        if (msg != null)
        {
            var body = msg.GetBody<MyMessageType>();

            ... //Do something interesting with the message

            //Remove the message from the queue
            msg.Complete();

            client.BeginReceive(TimeSpan.FromSeconds(5), OnMessageReceived, null);
        }
    }
}

这样我就有了一个带有停止机制的“无限循环”。

于 2014-11-03T15:07:21.153 回答
1

最新版本的 Azure ServiceBus SDK(下载链接)全面支持异步接收消息:

async Task TestMethod()
{
    string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");

    QueueClient Client = QueueClient.CreateFromConnectionString(connectionString, "TestQueue");
    var message = await Client.ReceiveAsync();
}
于 2015-12-17T10:31:04.667 回答