4

我目前正在尝试从会话中检索特定消息。

为此,我使用希望使用 . 在我传入消息序列号的MessageSession上接收Int64)。

这是我的代码 -

long msgSequenceNr = 1337;

QueueClient queueClient = QueueClient.CreateFromConnectionString(Constants.ServiceBusConnectionString, Constants.TestQueueEntityName, ReceiveMode.PeekLock);
MessageSession msgSession = queueClient.AcceptMessageSession(Constants.TestSessionId);

var peekedMsg = msgSession.Peek(msgSequenceNr); // <-- Works fine!
var receivedMsg = msgSession.Receive(msgSequenceNr); // <-- MessageNotFoundException

不幸的是,当 Peek 工作正常时,Receive 将导致MessageNotFoundException 。这是我错过的限制还是有其他方法可以实现这一点。

请注意,会话中可能存在多条消息

4

1 回答 1

4

带有 SequenceNumber 的接收只能与 Defer 方法结合使用。这是你将如何实现它:

  1. 收到消息,但现在无法处理(可能正在等待其他进程完成)。
  2. 将 SequenceNumber 持久保存在一些持久存储(表存储、SQL 数据库……)中
  3. 当您知道处理可以继续时(例如:相关进程已完成),从您的持久存储中加载所有序列号。
  4. 使用 Receive(int sequenceNumber) 或 ReceiveBatch(int[] sequenceNumbers) 接收和处理您的延迟消息。

示例应用程序:https ://code.msdn.microsoft.com/windowsazure/Brokered-Messaging-ccc4f879#content

更新:

形成您的评论我注意到“取消延迟”延迟消息可能是一种解决方案。这是一些取消延迟消息的示例代码,它将延迟消息复制到新消息,完成延迟消息并将新消息发送回队列中。这使用 TransactionScope 以事务方式完成并重新发送消息,以避免丢失消息的风险:

    var messageId = "12434539828282";

    // Send.
    var msg = new BrokeredMessage {SessionId = "user1", MessageId = messageId };
    msg.Properties.Add("Language", "Dutch");
    queue.Send(msg);

    // Receive.
    var session = queue.AcceptMessageSession();
    msg = session.Receive();

    // Store the sequence number.
    var sequenceNumber = msg.SequenceNumber;

    // Defer.
    msg.Defer();

    // Change to true to test if the transaction worked.
    var shouldThrow = false;

    // Later processing of deferred message.
    msg = session.Receive(sequenceNumber);

    try
    {
        using (var ts = new TransactionScope())
        {
            // Create a new message.
            var undeferredMessage = new BrokeredMessage {SessionId = msg.SessionId, MessageId = msg.MessageId};
            foreach (var prop in msg.Properties)
                undeferredMessage.Properties.Add(prop);

            // Complete and send within the same transaction.
            msg.Complete();
            if (shouldThrow)
                throw new InvalidOperationException("Some error");
            queue.Send(undeferredMessage);

            // Complete the transaction.
            ts.Complete();
        }
    }
    catch (Exception ex)
    {
        msg.Abandon();
    }

    if (shouldThrow)
    {
        msg = session.Receive(sequenceNumber);
        Console.WriteLine(msg.MessageId + " should match: " + messageId);
    }
    else
    {
        try
        {
            msg = session.Receive(sequenceNumber);
        }
        catch (Exception ex)
        {
            Console.WriteLine("Message not found, transaction worked OK.");
        }
    }

注意:这里我只是复制属性。请记住,您可能想要复制正文和任何其他附加信息。

于 2014-10-23T14:18:26.410 回答