1

我正在尝试编写一个集成/验收测试来测试一些天蓝色的代码,问题 ATM 中的代码只是订阅一个主题并发布到另一个主题。

我已经编写了测试,但它并不总是通过,似乎可能存在竞争条件。我尝试用几种方法编写它,包括使用 OnMessage 和使用 Receive(我在这里展示的示例)。

使用 OnMessage 时,测试似乎总是过早退出(大约 30 秒),我想这可能意味着它不适合这个测试。

我的查询具体涉及我的示例,我假设一旦我创建了对目标主题的订阅,任何发送给它的消息我都可以使用 Receive() 来接收,无论消息到达的时间点意味着什么,如果消息到达在我调用 Receive() 之前的目标主题中,我仍然可以通过调用 Receive() 来阅读消息。任何人都可以对此有所了解吗?

    namespace somenamespace {
    [TestClass]
    public class SampleTopicTest
    {
        private static TopicClient topicClient;
        private static SubscriptionClient subClientKoEligible;
        private static SubscriptionClient subClientKoIneligible;

        private static OnMessageOptions options;
        public const string TEST_MESSAGE_SUB = "TestMessageSub";
        private static NamespaceManager namespaceManager;

        private static string topicFleKoEligible;
        private static string topicFleKoIneligible;

        private BrokeredMessage message;

        [ClassInitialize]
        public static void BeforeClass(TestContext testContext)
        {
            //client for publishing messages
            string connectionString = ConfigurationManager.AppSettings["ServiceBusConnectionString"];
            string topicDataReady = ConfigurationManager.AppSettings["DataReadyTopicName"];
            topicClient = TopicClient.CreateFromConnectionString(connectionString, topicDataReady);

            topicFleKoEligible = ConfigurationManager.AppSettings["KnockOutEligibleTopicName"];
            topicFleKoIneligible = ConfigurationManager.AppSettings["KnockOutIneligibleTopicName"];

            //create test subscription to receive messages
            namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);


            if (!namespaceManager.SubscriptionExists(topicFleKoEligible, TEST_MESSAGE_SUB))
            {
                namespaceManager.CreateSubscription(topicFleKoEligible, TEST_MESSAGE_SUB);
            }

            if (!namespaceManager.SubscriptionExists(topicFleKoIneligible, TEST_MESSAGE_SUB))
            {
                namespaceManager.CreateSubscription(topicFleKoIneligible, TEST_MESSAGE_SUB);
            }

            //subscriber client koeligible
            subClientKoEligible = SubscriptionClient.CreateFromConnectionString(connectionString, topicFleKoEligible, TEST_MESSAGE_SUB);

            subClientKoIneligible = SubscriptionClient.CreateFromConnectionString(connectionString, topicFleKoIneligible, TEST_MESSAGE_SUB);

            options = new OnMessageOptions()
            {
                AutoComplete = false,
                AutoRenewTimeout = TimeSpan.FromMinutes(1),

            };
        }

          [TestMethod]
        public void E2EPOCTopicTestLT50()
        {
            Random rnd = new Random();
            string customerId = rnd.Next(1, 49).ToString();

            FurtherLendingCustomer sentCustomer = new FurtherLendingCustomer { CustomerId = customerId };
            BrokeredMessage sentMessage = new BrokeredMessage(sentCustomer.ToJson());           
            sentMessage.CorrelationId = Guid.NewGuid().ToString();
            string messageId = sentMessage.MessageId;
            topicClient.Send(sentMessage);

            Boolean messageRead = false;

            //wait for message to arrive on the ko eligible queue
            while((message = subClientKoEligible.Receive(TimeSpan.FromMinutes(2))) != null){

                //read message
                string messageString = message.GetBody<String>();

                //Serialize
                FurtherLendingCustomer receivedCustomer =  JsonConvert.DeserializeObject<FurtherLendingCustomer>(messageString.Substring(messageString.IndexOf("{")));

                //assertion
                Assert.AreEqual(sentCustomer.CustomerId, receivedCustomer.CustomerId,"verify customer id");

                //pop message
                message.Complete();
                messageRead = true;

                //leave loop after processing one message
                break;
            }
            if (!messageRead)
                Assert.Fail("Didn't receive any message after 2 mins");

        }
    }
}
4

1 回答 1

0

正如官方文档所述SubscriptionClient.Receive(TimeSpan)

参数 serverWaitTime TimeSpan

服务器在超时之前等待接收消息的时间跨度。

如果操作超过指定的超时时间,或者操作成功但没有更多消息要接收,则此 API 可以返回 Null

根据我的测试,如果一条消息发送到主题,然后在您的特定 serverWaitTime 内传递到您的订阅,那么无论消息是在您调用之前还是之后到达目标主题,您都可以收到一条消息Receive

使用 OnMessage 时,测试似乎总是过早退出(大约 30 秒),我想这可能意味着它不适合这个测试。

[TestMethod]
public void ReceiveMessages()
{
    subClient.OnMessage(msg => {
        System.Diagnostics.Trace.TraceInformation($"{DateTime.Now}:{msg.GetBody<string>()}");
        msg.Complete();
    });
    Task.Delay(TimeSpan.FromMinutes(5)).Wait();
}

在此处输入图像描述

对于Subscription​Client.​On​Message,我假设它基本上是一个循环调用Receive。调用后OnMessage,需要稍等片刻,停止该方法退出。这是一篇关于windows Azure Service Bus的事件驱动消息编程的博客,你可以参考这里

另外,我发现你的topicClient用于发送消息和subClientKoEligible用于接收消息的不是针对同一个主题路径。

于 2017-05-15T09:29:00.437 回答