0

我有两个异步运行的服务。service1 正在向 service2(响应队列)中的 MessageQueue 发布消息。Service2 正在从队列中提取每条消息并对其进行处理。问题是当 service1 因为完成发送消息而停止运行时...... service2 无法从队列中读取消息(仍然 service2 正在运行)(仍然队列有一些消息要读取)。

当服务一结束并且应用程序关闭时,就像服务 2 进入休眠状态一样。这是预期的行为吗?有没有解决办法。

我想让我的 service2(msmq 侦听器)继续独立于服务 1 接收消息。

 using System; 
using System.Diagnostics; 
using System.Net.NetworkInformation; 
using System.ServiceModel; 
using System.ServiceProcess; 
using System.Threading; 
using LOGGER; 
using MSMQ.INTERFACE; 
using RightFax; 
using Tools.Helper; 
using Microsoft.VisualBasic; 
using RFCOMAPILib; 
using FAXHandlerClass = FAXHandlerClass; 
using FAXHandlerState = CONTENT.SYSTEM.FAXHandlerState; 

namespace MSMQ.LISTENER 
{ 
    public partial class Service1 : ServiceBase 
    { 
        public static FaxServer RFFaxApi; 
        private const string LogClass = "MSMQ.LISTENER.PROGRAM::"; 
        private static bool _mBoolMustStop; 
        private static readonly RF2MQCounter Counter = new RF2MQCounter(); 
        private static RFaxServerStatus _rightFaxServerStatus; 
        public static FAXHandlerState MThState = new FAXHandlerState(); 
        private IMessageQueueHandler _queue; 
        public Service1() 
        { 
            InitializeComponent(); 
        } 

        protected override void OnStart(string[] args) 
        { 
            try 
            { 
                var logRoust = "OnStart"; 
                Generic.ConfigParam = LoadConfig.Invoke(LogClass); 

                //Ping the server 
                PingReply rep; 
                var mac = PingServer.Mac(out rep, Generic.ConfigParam.RightFax.server); 
                if (rep != null) 
                { 
                    Logger.Log(string.Format(@"Pinging {0} [{1}]", mac, rep.Address)); 
                    Logger.Log(string.Format("Reply From {0} : time={1} TTL={2}", rep.Address, rep.RoundtripTime, 
                                             rep.Options.Ttl)); 
                    //Connect to the Right Fax Server 
                    Actions.Connect(LogClass, Counter, ref RFFaxApi); 

                    //Start readin gthe queue 
                    IMessageQueueHandler _queue = new Test(); 
                    var threadQueuet = new Thread(_queue.StartRead); 
                    Logger.Log(string.Format("Start reading {0} queue...", 
                                             Generic.ConfigParam.MSMQ.responseQueue)); 
                    threadQueuet.Start(); 
                } 
                else 
                { 
                    Logger.Log(string.Format("Not able to  get a reply From {0} : time={1} TTL={2}", rep.Address, 
                                             rep.RoundtripTime, rep.Options.Ttl)); 
                } 
            } 
            catch (PingException e) 
            { 
                throw; 
            } 
            catch (Exception e) 
            { 
                Logger.Log(string.Format("{0} ::Not able to  start the  MSMQ.LISTENER Service on : {1}  Mesage:: {2}", LogClass, Generic.ConfigParam.RightFax.server, e.Message )); 
                throw; 
            } 
        } 


        protected override void OnStop() 
        { 
            if (_queue != null) 
                _queue.StopRead(); 
            Logger.Log(string.Format("Stopping MSMQ.LISTENER Service {0} queue...", Generic.ConfigParam.MSMQ.responseQueue)); 
        } 

        /// <summary> 
        /// Connect to the Rightfax server 
        /// </summary> 
        /// <param name="ref">Used for logging the routine</param> 
        /// <returns>The RightFax server connection status</returns> 
        /// <remarks></remarks> 
        public static RFaxServerStatus ConnectToRightFax(string @ref) 
        { 
            var logRoust = @ref + LogClass + "CONNECTION::"; 

            var retryCounter = 0; 

            Generic.ConfigParam = LoadConfig.Invoke(LogClass,Counter); 
            Logger.Log(string.Format("{0} - Connecting to {1} as user {2}", logRoust, Generic.ConfigParam.RightFax.server, "")); 
            _rightFaxServerStatus = RFaxServerStatus.Connecting; 

            try 
            { 
                if ((RFFaxApi != null)) 
                    RFFaxApi.CloseServer(); 
            } 
            catch 
            { 
                //We do nothing.... We will destroy the object anyway 
                //return false; 
            } 
            RFFaxApi = null; 

            do 
            { 
                MThState.AddEventState = new FAXHandlerClass(FaxHandlerStateEnum.ConnectingRfax); 

                try 
                { 
                    //********************************************************************************************** 
                    //    This section determines how quickly we try to reconnect 
                    //    Try the 1st 5 times 5 second apart 
                    //        the 2nd 5 times 30 seconds apart 
                    //        the 3rd 5 times 60 seconds apart 
                    //        every 300 seconds (5 mins) forever after that 
                    //********************************************************************************************** 
                    int sleepInterval; 
                    if (retryCounter > 15) 
                    { 
                        sleepInterval = 300000; 
                    } 
                    else 
                    { 
                        if (retryCounter > 10) 
                        { 
                            sleepInterval = 60000; 
                        } 
                        else 
                        { 
                            sleepInterval = retryCounter > 5 ? 30000 : 5000; 
                        } 
                    } 


                    //************************************************** 
                    //    Connect to the RightFax Server 
                    //************************************************** 
                    Logger.Log(string.Format("{0} - Attempt # {1}", logRoust, retryCounter)); 

                    if (retryCounter > 0) 
                    { 
                        Logger.Log(string.Format("{0} - Waiting # {1} seconds before trying to reconnect.", logRoust, sleepInterval / 1000)); 
                        Thread.Sleep(sleepInterval); 
                    } 



                    Logger.Log(string.Format("{0} - Initializing Connection to RightFax.", logRoust)); 

                    RFFaxApi = new FaxServer 
                    { 
                        ServerName = Generic.ConfigParam.RightFax.server.Trim(), 
                        UseNTAuthentication = RFCOMAPILib.BoolType.False, 
                        AuthorizationUserID = Generic.ConfigParam.RightFax.userID, 
                        AuthorizationUserPassword = Generic.ConfigParam.RightFax.password, 
                        Protocol = (CommunicationProtocolType)Generic.ConfigParam.RightFax.communicationProtocol 
                    }; 
                    //Verify if the RightFax Service is runing before connecting 
                    //var controller = new ServiceController("RightFax Server Module", _rfFaxApi.ServerName); 
                    //if ((controller.Status.Equals(ServiceControllerStatus.Running))) 
                    //{ 
                    try 
                    { 
                        RFFaxApi.OpenServer(); 
                        _rightFaxServerStatus = RFaxServerStatus.Connected; 
                        retryCounter = 0; 
                        Logger.Log(string.Format("{0} - Connected to {1} (V. {2}) as user {3}", logRoust, RFFaxApi.ServerName, RFFaxApi.Version, RFFaxApi.AuthorizationUserID)); 

                    } 
                    catch (Exception ex) 
                    { 

                        Logger.Log(string.Format("{0} - ERROR Connected to {1} (V. {2}) as user {3} MESSAGE: {4}", logRoust, RFFaxApi.ServerName, RFFaxApi.Version, RFFaxApi.AuthorizationUserID, ex.Message)); 
                    } 


                } 
                catch (System.Runtime.InteropServices.COMException ex) 
                { 
                    //Its OK not to end the loop. This is possibly a temporary error condition. 
                    Logger.Log(string.Format("{0} - Connection failed Message: {1}", logRoust, ex.Message), EventLogEntryType.Warning); 
                } 
                catch (Exception ex) 
                { 
                    if (Strings.InStr(ex.Source, "RFComAPI.FaxServer") > 0 | Strings.InStr(ex.Source, "Interop.RFCOMAPILib") > 0) 
                    { 
                        //Its OK not to end the loop. This is possibly a temporary error condition. 
                        Logger.Log(string.Format("{0} - Connection failed : Message {1} ", logRoust, ex.Message), EventLogEntryType.Warning); 
                    } 
                    else 
                    { 
                        _rightFaxServerStatus = RFaxServerStatus.NotConnected; 
                        Logger.Log(string.Format("{0} - Connection failed. Message: {1}", logRoust, ex.Message), EventLogEntryType.Error); 
                        throw new Exception(string.Format("{0} - Connection failed. Message: {1}", logRoust, ex.Message)); 
                    } 

                } 
                finally 
                { 
                    retryCounter += 1; 
                } 

            } while (!(_rightFaxServerStatus == RFaxServerStatus.Connected | _mBoolMustStop)); 
            return _rightFaxServerStatus; 

        } 
    } 
} 


--------------------------------------------- 
using System.Net.NetworkInformation; 
using System.ServiceProcess; 
using System.Threading; 
using LOGGER; 
using MSMQ.INTERFACE; 
using .RightFax; 
using Tools.Helper; 
using RFCOMAPILib; 
using FAXHandlerState = HandlerState; 

namespace MSMQ.LISTENER 
{ 
    static class Program 
    { 


        private const string LogClass = "MSMQ.LISTENER.PROGRAM::"; 
        public static FaxServer RFFaxApi; 

        private static bool _mBoolMustStop; 
        private static readonly RF2MQCounter Counter = new RF2MQCounter(); 
        private static RFaxServerStatus _rightFaxServerStatus; 
        public static FAXHandlerState MThState = new FAXHandlerState(); 

        /// <summary> 
        /// The main entry point for the application. 
        /// </summary> 
        static void Main() 
        { 
             Run(); 
            //OR comment above to be able to Debug 
            //Uncomment below to start in debug mode 
         // Start(); 
        } 

        private static void Run() 
        { 
            var servicesToRun = new ServiceBase[] 
                                              { 
                                                  new Service1() 
                                              }; 
            ServiceBase.Run(servicesToRun); 
        } 
        public static void StartThreadProc(object stateInfo) 
        { 
            Start(); 
        } 


        public static void Start() 
        { 
            try 
            { 
                var logRoust = "OnStart"; 
                Generic.ConfigParam = LoadConfig.Invoke(LogClass); 

                //Ping the server 
                PingReply rep; 
                var mac = PingServer.Mac(out rep, Generic.ConfigParam.RightFax.server); 
                if (rep != null) 
                { 
                    Logger.Log(string.Format(@"Pinging {0} [{1}]", mac, rep.Address)); 
                    Logger.Log(string.Format("Reply From {0} : time={1} TTL={2}", rep.Address, rep.RoundtripTime, 
                                             rep.Options.Ttl)); 
                    //Connect to the Right Fax Server 
                    Actions.Connect(LogClass, Counter, ref RFFaxApi); 

                    //Start readin gthe queue 
                    IMessageQueueHandler _queue = new Isoconvoceresponse(); 
                    var threadQueuet = new Thread(_queue.StartRead); 
                    Logger.Log(string.Format("Start reading {0} queue...", 
                                             Generic.ConfigParam.MSMQ.responseQueue)); 
                    threadQueuet.Start(); 
                } 
                else 
                { 
                    Logger.Log(string.Format("Not able to  get a reply From {0} : time={1} TTL={2}", rep.Address, 
                                             rep.RoundtripTime, rep.Options.Ttl)); 
                } 
            } 
            catch (PingException e) 
            { 
                throw; 
            } 
        } 


    } 
} 


___________________________ 
namespace MSMQ.INTERFACE 
{ 
    public interface IMessageQueueHandler 
    { 
        void StartRead(); 
        void StopRead(); 
    } 
} 
-------------- 
using System; 
using System.Diagnostics; 
using System.Globalization; 
using System.Messaging; 
using System.Net.NetworkInformation; 
using System.ServiceModel; 
using System.ServiceProcess; 
using System.Threading; 
using CONTENT.SYSTEM; 
using LOGGER; 
using MSMQ.INTERFACE; 
using RightFax; 
using RightFax.Tools.Helper; 
using RFCOMAPILib; 
using MessageQueue = System.Messaging.MessageQueue; 
using RF2MQCounter = RF2MQCounter; 

namespace MSMQ.LISTENER 
{ 

    public class Test : IMessageQueueHandler 
    { 
        private MessageQueue _queue; 
        private readonly string _queueName; 
        private const string LogClass = ".MSMQ.LISTENER::"; 
        private readonly ManualResetEvent manualResetEvent = new ManualResetEvent(true); 
        private long handle ; 

        /// <summary> 
        /// 
        /// </summary> 
        public Test() 
        { 

            const string logroust = LogClass + "Isoconvoceresponse"; 
            Generic.ConfigParam = LoadConfig.Invoke(logroust); 
            _queueName =Generic.ConfigParam.MSMQ.responseQueue; 
        } 

        /// <summary> 
        /// 
        /// </summary> 
        public void StartRead() 
        { 

           // System.Diagnostics.Debugger.Break(); 
            const string logRoust = LogClass + "StartRead::"; 
            try 
            { 
                Logger.Log(String.Format("{0} - Start Reading the {1} Queue .", logRoust, Generic.ConfigParam.MSMQ.responseQueue )); 
                _queue = new MessageQueue(_queueName) {Formatter = new XmlMessageFormatter(new[] {typeof (string)})}; 

                _queue.MessageReadPropertyFilter.SetAll(); 
                var objDefProps = new System.Messaging.DefaultPropertiesToSend 
                { 
                    Priority = MessagePriority.High, 
                    Recoverable = true, 
                    UseDeadLetterQueue = true, 
                    UseJournalQueue = true, 

                }; 
                _queue.DefaultPropertiesToSend = objDefProps; 



                _queue.PeekCompleted += QueuePeekCompleted; 
                _queue.BeginPeek(); 
                //_queue.ReceiveCompleted += QueueReceiveCompletedd;//Event handler 
                //_queue.BeginReceive(); 



            } 
            catch (Exception ex) 
            { 
                Actions.SetGenericFlagOff(handle.ToString(CultureInfo.InvariantCulture),Generic.ConfigParam.dbConnectionString); 
                Logger.Log( 
                    String.Format("{0} - Start Reading the {1} failed  : Error: {2}.", logRoust, Generic.ConfigParam.MSMQ.responseQueue, ex.Message), 
                    EventLogEntryType.Error); 
                throw; 
            } 
        } 

        public static void OnServiceFaulted(object sender, EventArgs e) 
        { 

            Logger.Log(string.Format("{0} ::Service Faulted: {1}  Mesage:: {2}", LogClass, Generic.ConfigParam.RightFax.server, e.ToString())); 

        } 


        public void StopRead() 
        {//make the process synchronous before closing the queue 
            manualResetEvent.WaitOne(); 

            if (_queue == null) return; 
            _queue.Close(); 
            _queue = null; 
        } 

        /// <summary> 
        /// 
        /// </summary> 
        /// <param name="sender"></param> 
        /// <param name="e"></param> 
        private void QueuePeekCompleted(object sender, PeekCompletedEventArgs e) 
        { 


           // System.Diagnostics.Debugger.Break(); 
            const string logRoust = LogClass + "QueuePeekCompleted::"; 

            var message = _queue.EndPeek(e.AsyncResult); 
            _queue.Receive(); 
            _queue.BeginPeek(); 

          var allMessagesOnResponseQueue = _queue.GetAllMessages(); 

          foreach (var msg in allMessagesOnResponseQueue) 
          { 
              Logger.Log(String.Format("{0} - Messages QueuePeekCompleted event handler {1}", logRoust, message.Label + " - " + message.Id)); 


          } 
          do 
            { 

                try 
                { 
                    if (message.MessageType == MessageType.Acknowledgment) 
                        switch (message.Acknowledgment) 
                        { 
                            case Acknowledgment.Purged: 
                                Logger.Log("Message Purged {0}", message.Body.ToString()); 
                                break; 
                            case Acknowledgment.QueueExceedMaximumSize: 
                                Logger.Log("Message Queue Exceed MaximumSize {0}", message.Body.ToString()); 
                                break; 
                            case Acknowledgment.QueuePurged: 
                                Logger.Log("Message Queue Purged {0}", message.Body.ToString()); 
                                break; 
                            case Acknowledgment.ReceiveTimeout: 
                                Logger.Log("Message ReceiveTimeout {0}, Now restarting MSMQ.LISTENER Service", 
                                           message.Body.ToString()); 
                                var controller = new ServiceController("MSMQ.LISTENER", "STHA38994.iad.ca.inet"); 
                                if (controller.Status.Equals(ServiceControllerStatus.Running)) 
                                    controller.Start(); 
                                break; 
                            case Acknowledgment.ReachQueue: 
                                Logger.Log("Message Reached Queue {0}", message.Body.ToString()); 
                                break; 
                            case Acknowledgment.Receive: 
                                Logger.Log("Message Received {0}", message.Body.ToString()); 
                                break; 
                        } 

                    /
                    if (message.MessageType == MessageType.Normal) 
                    { 
                        var messageDetail = message.Label.Split(' '); 
                        var pdfFileDetails = messageDetail[1].Split('_'); 
                         handle = Convert.ToInt64(pdfFileDetails[0]); 
                        var isSucessfull = false; 

                        FaxServer faxServer = null; 
                        if (MSMQ.LISTENER.Service1.RFFaxApi == null) 
                        { 
                            Generic.ConfigParam = LoadConfig.Invoke(LogClass); 
                            var counter = new RF2MQCounter(); 

                            Actions.Connect(LogClass, counter, ref faxServer); //you can pass a null faxserver 

                        } 

                        if (faxServer != null) 
                        { 


                            try 
                            { 
                                Logger.Log("Getting Fax {0}", handle.ToString(CultureInfo.InvariantCulture)); 
                                var rfaxFax = Actions.GetFax(handle, ref faxServer); 
                                Logger.Log("Getting Fax {0} SUCESS", handle.ToString(CultureInfo.InvariantCulture)); 
                                Generic.NumberOfRecordsSent++; 
                                Logger.Log("Processing message {0}", handle.ToString(CultureInfo.InvariantCulture)); 
                                isSucessfull = RFServiceClass.ProcessMessage(message, ref rfaxFax); 
                                Logger.Log("Processing message SUCESS {0}", 
                                           handle.ToString(CultureInfo.InvariantCulture)); 

                            } 
                            catch (Exception ex) 
                            { 
                                Generic.NumberOfRecordsSent--; 
                                Logger.Log( 
                                    String.Format("{0} - ERROR processinf fax from the queue.. {1}", logRoust, 
                                                  ex.Message), EventLogEntryType.Error); 
                                Actions.SetGenericFlagOff(handle.ToString(CultureInfo.InvariantCulture), 
                                                          Generic.ConfigParam.dbConnectionString); 
                            } 
                        } 
                    } 
                } 
                catch (MessageQueueException msq) 
                { 
                    var messageDetail = message.Label.Split(' '); 
                    var pdfFileDetails = messageDetail[1].Split('_'); 
                    long handle = Convert.ToInt64(pdfFileDetails[0]); 
                    Actions.SetGenericFlagOff(handle.ToString(CultureInfo.InvariantCulture), Generic.ConfigParam.dbConnectionString); 
                    switch (msq.MessageQueueErrorCode) 
                    { 
                        case MessageQueueErrorCode.IOTimeout: 
                            Logger.Log( 
                                String.Format("{0} - Error Message IOTimeout Exception: {1}.", logRoust, msq.Message), 
                                EventLogEntryType.Error); 
                            Actions.SetGenericFlagOff(handle.ToString(CultureInfo.InvariantCulture),Generic.ConfigParam.dbConnectionString ); 
                            break;                     
                         default: 
                            Logger.Log( 
                                String.Format("{0} - Error Message DEFAULT Exception: {1}.", logRoust, msq.Message), 
                                EventLogEntryType.Error); 
                            break; 
                    } 

                } 
                catch (Exception ex) 
                { 
                    Actions.SetGenericFlagOff(handle.ToString(CultureInfo.InvariantCulture), Generic.ConfigParam.dbConnectionString); 
                    Logger.Log( 
                        String.Format("{0} - Error QueuePeekCompleted: {1}.", logRoust, ex.Message), 
                        EventLogEntryType.Error); 
                    message = null; 
                } 
            } while (message != null); 

        } 
        } 

        public override string ToString() 
        { 
            return _queueName; 
        } 
    } 
} 
4

0 回答 0