8

1.)从.net客户端,我如何测试客户端是否连接到服务器(即可以发送和接收)是的,我可以在try块内发送消息并捕获随后的异常,但我希望一个更优雅的解决方案。

2) 如何打开、关闭和重新打开连接?在我尝试解决上面的问题 1 时,我发现如果我打开一个连接然后调用 connection.Close() 我无法从连接工厂获得另一个连接(参见下面的代码片段)。我收到错误消息 XMSCC0008

我正在使用一个非常标准的 vanilla MQ 配置。这是我的客户端的连接方式:

ISession session = MQAccess.GetSession(MQAccess.Connection);
IDestination destination = session.CreateTopic(SubTopicName);
Consumer = MQAccess.GetConsumer(session, destination);
Consumer.MessageListener = new MessageListener(HandleMQSubEvent);
MQAccess.Connection.Start();

其中 MQAccess 是一个小型实用程序类。

编辑问题以添加 MQAccess 代码:

public static class MQAccess
{
    public static readonly MQConfigurationSectionHandler ConfigSettings;
    public static readonly IConnectionFactory ConnectionFactory;

    private static readonly IConnection connection;
    public static IConnection Connection
    {
        get { return connection; }
    }

    static MQAccess()
    {
        ConfigSettings = (MQConfigurationSectionHandler)
            ConfigurationManager.GetSection("mq-configuration");

        XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        ConnectionFactory = factory.CreateConnectionFactory();
        ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
        ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
        ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);

        if (ConfigSettings.QueueManager == string.Empty)
        {
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
        }
        else
        {
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);
        }

        connection = GetConnection();
    }

    public static IConnection GetConnection()
    {
        return ConnectionFactory.CreateConnection();
    }

    public static ISession GetSession(IConnection connection)
    {
        return connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
    }

    public static IMessageProducer GetProducer(ISession session, IDestination destination)
    {
        return session.CreateProducer(destination);
    }

    public static IMessageConsumer GetConsumer(ISession session, IDestination destination)
    {
        return session.CreateConsumer(destination);
    }

    public static void MQPub(string TopicURI, string message)
    {
        using (var session = GetSession(Connection))
        {
            using (var destination = session.CreateTopic(TopicURI))
            {
                using (var producer = GetProducer(session, destination))
                {
                    producer.Send(session.CreateTextMessage(message));
                }
            }
        }
    }

    public static void MQPub(string TopicURI, IEnumerable<string> messages)
    {
        using (var session = GetSession(Connection))
        {
            using (var destination = session.CreateTopic(TopicURI))
            {
                using (var producer = GetProducer(session, destination))
                {
                    foreach (var message in messages)
                    {
                        producer.Send(session.CreateTextMessage(message));
                    }
                }
            }
        }
    }
}

编辑:将 MQAccess 类重命名为 MQClient。根据 T Rob 的建议将其设为实例类。断开连接方法仍然崩溃并出现上面列出的错误消息

public class MQClient : IDisposable
{
    public MQConfigurationSectionHandler ConfigSettings { get; private set; }
    public IConnectionFactory ConnectionFactory { get; private set; }

    public IConnection Connection { get; private set;  }

    public IMessageConsumer Consumer { get; private set; }
    public IMessageProducer Producer { get; private set; }
    // Save sessions as fields for disposing and future subscription functionality
    private ISession ProducerSession;
    private ISession ConsumerSession;
    public string SubTopicName { get; private set; }
    public string PubTopicName { get; private set; }
    public bool IsConnected { get; private set; }
    public event Action<Exception> ConnectionError;
    private Action<IMessage> IncomingMessageHandler;

    public MQClient(string subTopicName, string pubTopicName, Action<IMessage> incomingMessageHandler)
    {
        // Dont put connect logic in the constructor.  If we lose the connection we may need to connect again.
        SubTopicName = subTopicName;
        PubTopicName = pubTopicName;
        IncomingMessageHandler = incomingMessageHandler;
    }

    public string Connect()
    {
        IsConnected = false;
        string errorMsg = string.Empty;

        ConfigSettings = (MQConfigurationSectionHandler)
                ConfigurationManager.GetSection("mq-configuration");

        XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        ConnectionFactory = factory.CreateConnectionFactory();
        ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
        ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
        ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);

        if (ConfigSettings.QueueManager == string.Empty)
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
        else
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);

        Connection = ConnectionFactory.CreateConnection();


        if (!string.IsNullOrEmpty(PubTopicName))
        {
            ProducerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
            Producer = ProducerSession.CreateProducer(ProducerSession.CreateTopic(PubTopicName));
        }

        if (!string.IsNullOrEmpty(SubTopicName) && IncomingMessageHandler != null)
        {
            ConsumerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
            Consumer = ConsumerSession.CreateConsumer(ConsumerSession.CreateTopic(SubTopicName));
            Consumer.MessageListener = new MessageListener(IncomingMessageHandler);
        }

        try
        {
            Connection.Start();
            Connection.ExceptionListener = new ExceptionListener(ConnectionExceptionHandler);
            IsConnected = true;
        }
        catch (TypeInitializationException ex)
        {
            errorMsg = "A TypeInitializationException error occured while attempting to connect to MQ.  Check the Queue configuration in App.config. The error message is: " + ex.Message; 
        }
        catch (IllegalStateException ex)
        {
            errorMsg = "An IllegalStateException error occured while attempting to connect to MQ.  Check the Queue configuration in App.config. The error message is: " + ex.Message; 
        }

        return errorMsg;
    }

    public void Disconnect()
    {
        if (Producer != null)
        {
            Producer.Close();
            Producer.Dispose();
            Producer = null;
        }

        if (ProducerSession != null)
        {
            // Call Unsubscribe here if subscription is durable

            ProducerSession.Close();
            ProducerSession.Dispose();
            ProducerSession = null;
        }

        if (Connection != null)
        {
            Connection.Stop();

            //if (Connection.ExceptionListener != null)
            //    Connection.ExceptionListener = null;

            // Per Shashi............
            //if (Consumer.MessageListener != null)
            //    Consumer.MessageListener = null;

            Connection.Close();
            Connection.Dispose();
            Connection = null;
        }

        if (Consumer != null)
        {

            if (Consumer.MessageListener != null)
                Consumer.MessageListener = null;

            Consumer.Close();
            Consumer.Dispose();
            Consumer = null;
        }


        if (ConsumerSession != null)
        {
            // Call Unsubscribe here if subscription is durable
            ConsumerSession.Close();
            ConsumerSession.Dispose();
            ConsumerSession = null;
        }

        IsConnected = false;
    }


    public void Publish(string message)
    {
        Producer.Send(ProducerSession.CreateTextMessage(message));
    }


    public void Publish(string[] messages)
    {
        foreach (string msg in messages)
            Publish(msg);
    }

    public void ConnectionExceptionHandler(Exception ex)
    {
        Disconnect(); // Clean up

        if (ConnectionError != null)
            ConnectionError(ex);
    }

    #region IDisposable Members
    private bool disposed;

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!this.disposed)
        {
            if (disposing)
                Disconnect();

            disposed = true;
        }
    }
    #endregion

}
4

2 回答 2

8

问题在这里-->where MQAccess is a small utility class.

问题的第一部分询问如何判断连接是否处于活动状态。WebSphere MQ 的 XMS 类是针对非 Java 平台的 JMS 规范的实现。它们非常严格地遵循 JMS 规范,并且 JMS 规范没有与连接或会话等效的方法,isConnected因此 XMS 也没有。但是,所有 GET 和 PUT 活动都应该发生在 try/catch 块中,以便捕获 JMS 异常。(您总是从中打印linkedException,对吗?)当抛出 JMS 异常时,应用程序要么将其视为致命并终止,要么关闭除连接工厂之外的所有 JMS 对象,等待几秒钟,然后重新驱动连接顺序。

根据问题中的新信息进行更新:
感谢您发布 MQAccess 类。这为正在发生的事情提供了相当多的洞察力,尽管仍然没有任何代码显示根据问题的第 2 部分关闭和重新打开连接的位置。

但是,代码显示该类在构造类实例MQAccess时创建了一个私有实例,ICONNECTION connection然后将其公开公开为MQAccess.GetConnection. 当前发布的MQAccess类没有公共或私有类方法可以替换所持有的连接句柄,connection因此如果MQAccess.Connection.Close()曾经调用过,类中IConnection的该对象实例MQAccess将永远持有无效的连接句柄。一旦连接关闭,该实例MQAccess实际上就死了。您必须删除并重新实例化MQAccess才能获得新连接。

该类MQAccess确实公开了连接工厂,因此理论上可以MQAccess.GetConnection从类外部调用并获得一个有效的新IConnection对象,即使在关闭原始对象之后也是如此。但是,该实例将存在于MQAccess类范围之外,因此任何后续调用都MQAccess将引用其失效的实例变量connection,而不是在类之外创建的新连接实例。

如果您需要关闭并重新创建连接,您可以考虑从MQAccess. 一种技术含量较低的方法可能是为MQAccess.Close()连接编写一个方法,该方法将关闭现有连接然后立即调用connection = GetConnection();,以便私有connection变量始终保持有效的连接句柄。

如果这不能解决问题,请发布关闭并重新创建连接的代码。

顺便说一句,网络连接上的非事务会话为任何 JMS 提供者(包括 WMQ)打开了丢失或重复消息的可能性。这是你的本意吗?我已经解释了为什么这是在另一个 SO 帖子中。

于 2012-10-19T02:50:56.483 回答
5

添加到 T.Rob 的评论。

问题 1:
希望您能获得MQAccess. 如果是,您可以在其中公开一个属性MQAccess,指示连接是否处于活动状态。如果您没有访问权限,那么您可能必须要求该类的作者添加此属性。您可以执行以下操作来设置/重置属性。

1) createConnection方法返回成功后设置属性。
2)为连接设置一个异常监听器。
3) 重置异常处理程序中的属性。如果是连接断开错误(XMSWMQ1107 和链接的异常可能有 MQRC 2009),请检查原因代码并重置属性。

问题 2
如果您能向我们展示您的身份closingreopening关系,将会有所帮助。我关闭连接的建议是:
1)首先做一个connection.Stop()。
2)删除任何消息侦听器,基本上是做一个consumer.MessageListener = null。
3)然后做connection.Close()。
4)做一个连接=空

附加信息 这是我用来测试的样本。

    private void OnException(Exception ex)
    {
        XMSException xmsex = (XMSException)ex;
        Console.WriteLine("Got exception");
        // Check the error code.
        if (xmsex.ErrorCode == "XMSWMQ1107")
        {
            Console.WriteLine("This is a connection broken error");
            stopProcessing = true; // This is a class member variable
        }
    }

在创建连接的方法中,设置异常侦听器。

        // Create connection.
        connectionWMQ = cf.CreateConnection();
        connectionWMQ.ExceptionListener = new ExceptionListener(OnException);

每当出现连接错误时,将调用异常侦听器并将 flag 设置为 true。

在不再需要对象时将其丢弃是一种很好的做法。有父子关系,消费者,生产者等是会话的孩子,而会话又是连接的孩子。因此,处置顺序可以是孩子优先,然后是父母。但是如果一个父母被处置,孩子也被自动处置。

于 2012-10-19T04:27:33.273 回答