0

我正在使用 azure 服务总线主题。因为我有大消息,所以我将大消息拆分并以带有 sessionid 和拆分顺序的小消息发送。我希望我的接收器具有事件驱动的架构。因为我必须接收具有相同 sessionid 的所有消息,并且必须使用正确的拆分顺序聚合它们。下面是我的代码。但只是我第一次从波纹管代码中收到消息。在第二条消息中它超时。

         public class CRMESBListener : RoleEntryPoint
            {
                private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

            public override void Run()
            {
                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running");
                try
                {
                    DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner();
                    dbMessageListener.Listen();
                    runCompleteEvent.WaitOne();
                    //this.RunAsync(this.cancellationTokenSource.Token).Wait();
                }
                finally
                {
                    this.runCompleteEvent.Set();
                }
            }

            public override bool OnStart()
            {
                // Set the maximum number of concurrent connections
                ServicePointManager.DefaultConnectionLimit = 12;

                // For information on handling configuration changes
                // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.


                bool result = base.OnStart();
                Bootstrapper.Init();

                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started");

                return result;
            }

            public override void OnStop()
            {
                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping");

                this.cancellationTokenSource.Cancel();
                this.runCompleteEvent.WaitOne();

                base.OnStop();

                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped");
            }

            private async Task RunAsync(CancellationToken cancellationToken)
            {
                // TODO: Replace the following with your own logic.
                while (!cancellationToken.IsCancellationRequested)
                {
                    Trace.TraceInformation("Working");
                    await Task.Delay(1000);
                }
            }
        }













      public class DBMessageListener
        {
            #region Member Variables

            private static DBMessageListener dbMessageListner;
            private static object lockObject = new object();
            private TopicSubscribeClientWrapper accountTopicClient;

            private NamespaceManager namespaceManager;
            private OnMessageOptions eventDrivenMessagingOptions;

            private int crmIntegrationUserID = Common.CrmCurrentUser.UserID;

            #endregion Member Variables

            #region Constructors

            private DBMessageListener()
            {
                string subscriptionName = "AllMessages";
                namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString);

                if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName))
                {
                    namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName);
                }
                accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath);
                accountTopicClient.SubscriptionName = subscriptionName;



                eventDrivenMessagingOptions = new OnMessageOptions
                {
                    AutoComplete = true
                };

                eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived;
                eventDrivenMessagingOptions.MaxConcurrentCalls = 5;
            }

            #endregion Constructors

            #region Methods

            private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message)
            {
                if (message != null)
                {
                    try
                    {
                        await ProcessDBMessage(message.GetBody<ServiceBusMessage>());
                    }
                    catch (Exception ex)
                    {
                        //log exception
                    }
                }

            }

            private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e)
            {
                if (e != null && e.Exception != null)
                {

                }
            }

            private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message)
            {

    //process message           
            }

            public static DBMessageListener GetDBMessageListner()
            {
                if (dbMessageListner == null)
                {
                    lock (lockObject)
                    {
                        if (dbMessageListner == null)
                        {
                            dbMessageListner = new DBMessageListener();
                        }
                    }
                }

                return dbMessageListner;
            }

            public void Listen()
            {
                accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions);

            }

            #endregion Methods
        }  


 public class TopicSubscribeClientWrapper : IServiceBusClientWrapper
    {
        #region Member Variables

        private readonly string _connectionString;
        private readonly string _topicName;
        private readonly TopicClient _topicClient;
        private SubscriptionClient _subscriptionClient;

        #endregion Member Variables

        #region Properties

        public string SubscriptionName { get; set; }

        #endregion Properties

        #region Constructors

        public TopicSubscribeClientWrapper(string connectionString, string topicName)
        {
            _connectionString = connectionString;
            _topicName = topicName;
            _topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName);
        }

        #endregion Constructors

        #region Event Handlers

        public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions)
        {

            _subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName);

           // _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);

            MemoryStream largeMessageStream = new MemoryStream();
            MessageSession session = _subscriptionClient.AcceptMessageSession();

            while (true)
            {
                BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));

                if (subMessage != null)
                {
                    Stream subMessageStream = subMessage.GetBody<Stream>();
                    subMessageStream.CopyTo(largeMessageStream);

                    subMessage.Complete();
                    //Console.Write(".");
                }
                else
                {
                    //Console.WriteLine("Done!");
                    break;
                }
            }

            BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);
            var message = onMessageCallback.Method.GetParameters();
            message.SetValue(largeMessage, 1);
            _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);

        }

        #endregion Event Handlers

        #region Methods

        public Task SendAsync(BrokeredMessage message)
        {
            return _topicClient.SendAsync(message);
        }

        public void Close()
        {
            if (_subscriptionClient != null)
            {
                _subscriptionClient.Close();
            }

            _topicClient.Close();
        }

        #endregion Methods
    }
4

1 回答 1

1

我建议采取不同的路线。与其尝试创建消息会话来传递大型消息,不如使用专门解决此问题的声明检查模式- 大型附件。将您的数据写入存储 blob,并让 URI 随消息一起发送。保存/恢复 blob 会简单得多,而不是尝试以块的形式发送有效负载。此外,这种方式更容易监控您的系统(与一个或多个 blob 相关联的一条失败的成功/失败消息)并且您不需要不必使用会话或任何特殊的东西。

于 2016-12-05T05:24:41.780 回答