1.)从.net客户端,我如何测试客户端是否连接到服务器(即可以发送和接收)是的,我可以在try块内发送消息并捕获随后的异常,但我希望一个更优雅的解决方案。
2) 如何打开、关闭和重新打开连接?在我尝试解决上面的问题 1 时,我发现如果我打开一个连接然后调用 connection.Close() 我无法从连接工厂获得另一个连接(参见下面的代码片段)。我收到错误消息 XMSCC0008
我正在使用一个非常标准的 vanilla MQ 配置。这是我的客户端的连接方式:
ISession session = MQAccess.GetSession(MQAccess.Connection);
IDestination destination = session.CreateTopic(SubTopicName);
Consumer = MQAccess.GetConsumer(session, destination);
Consumer.MessageListener = new MessageListener(HandleMQSubEvent);
MQAccess.Connection.Start();
其中 MQAccess 是一个小型实用程序类。
编辑问题以添加 MQAccess 代码:
public static class MQAccess
{
public static readonly MQConfigurationSectionHandler ConfigSettings;
public static readonly IConnectionFactory ConnectionFactory;
private static readonly IConnection connection;
public static IConnection Connection
{
get { return connection; }
}
static MQAccess()
{
ConfigSettings = (MQConfigurationSectionHandler)
ConfigurationManager.GetSection("mq-configuration");
XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
ConnectionFactory = factory.CreateConnectionFactory();
ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);
if (ConfigSettings.QueueManager == string.Empty)
{
ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
}
else
{
ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);
}
connection = GetConnection();
}
public static IConnection GetConnection()
{
return ConnectionFactory.CreateConnection();
}
public static ISession GetSession(IConnection connection)
{
return connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
}
public static IMessageProducer GetProducer(ISession session, IDestination destination)
{
return session.CreateProducer(destination);
}
public static IMessageConsumer GetConsumer(ISession session, IDestination destination)
{
return session.CreateConsumer(destination);
}
public static void MQPub(string TopicURI, string message)
{
using (var session = GetSession(Connection))
{
using (var destination = session.CreateTopic(TopicURI))
{
using (var producer = GetProducer(session, destination))
{
producer.Send(session.CreateTextMessage(message));
}
}
}
}
public static void MQPub(string TopicURI, IEnumerable<string> messages)
{
using (var session = GetSession(Connection))
{
using (var destination = session.CreateTopic(TopicURI))
{
using (var producer = GetProducer(session, destination))
{
foreach (var message in messages)
{
producer.Send(session.CreateTextMessage(message));
}
}
}
}
}
}
编辑:将 MQAccess 类重命名为 MQClient。根据 T Rob 的建议将其设为实例类。断开连接方法仍然崩溃并出现上面列出的错误消息
public class MQClient : IDisposable
{
public MQConfigurationSectionHandler ConfigSettings { get; private set; }
public IConnectionFactory ConnectionFactory { get; private set; }
public IConnection Connection { get; private set; }
public IMessageConsumer Consumer { get; private set; }
public IMessageProducer Producer { get; private set; }
// Save sessions as fields for disposing and future subscription functionality
private ISession ProducerSession;
private ISession ConsumerSession;
public string SubTopicName { get; private set; }
public string PubTopicName { get; private set; }
public bool IsConnected { get; private set; }
public event Action<Exception> ConnectionError;
private Action<IMessage> IncomingMessageHandler;
public MQClient(string subTopicName, string pubTopicName, Action<IMessage> incomingMessageHandler)
{
// Dont put connect logic in the constructor. If we lose the connection we may need to connect again.
SubTopicName = subTopicName;
PubTopicName = pubTopicName;
IncomingMessageHandler = incomingMessageHandler;
}
public string Connect()
{
IsConnected = false;
string errorMsg = string.Empty;
ConfigSettings = (MQConfigurationSectionHandler)
ConfigurationManager.GetSection("mq-configuration");
XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
ConnectionFactory = factory.CreateConnectionFactory();
ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);
if (ConfigSettings.QueueManager == string.Empty)
ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
else
ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);
Connection = ConnectionFactory.CreateConnection();
if (!string.IsNullOrEmpty(PubTopicName))
{
ProducerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
Producer = ProducerSession.CreateProducer(ProducerSession.CreateTopic(PubTopicName));
}
if (!string.IsNullOrEmpty(SubTopicName) && IncomingMessageHandler != null)
{
ConsumerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
Consumer = ConsumerSession.CreateConsumer(ConsumerSession.CreateTopic(SubTopicName));
Consumer.MessageListener = new MessageListener(IncomingMessageHandler);
}
try
{
Connection.Start();
Connection.ExceptionListener = new ExceptionListener(ConnectionExceptionHandler);
IsConnected = true;
}
catch (TypeInitializationException ex)
{
errorMsg = "A TypeInitializationException error occured while attempting to connect to MQ. Check the Queue configuration in App.config. The error message is: " + ex.Message;
}
catch (IllegalStateException ex)
{
errorMsg = "An IllegalStateException error occured while attempting to connect to MQ. Check the Queue configuration in App.config. The error message is: " + ex.Message;
}
return errorMsg;
}
public void Disconnect()
{
if (Producer != null)
{
Producer.Close();
Producer.Dispose();
Producer = null;
}
if (ProducerSession != null)
{
// Call Unsubscribe here if subscription is durable
ProducerSession.Close();
ProducerSession.Dispose();
ProducerSession = null;
}
if (Connection != null)
{
Connection.Stop();
//if (Connection.ExceptionListener != null)
// Connection.ExceptionListener = null;
// Per Shashi............
//if (Consumer.MessageListener != null)
// Consumer.MessageListener = null;
Connection.Close();
Connection.Dispose();
Connection = null;
}
if (Consumer != null)
{
if (Consumer.MessageListener != null)
Consumer.MessageListener = null;
Consumer.Close();
Consumer.Dispose();
Consumer = null;
}
if (ConsumerSession != null)
{
// Call Unsubscribe here if subscription is durable
ConsumerSession.Close();
ConsumerSession.Dispose();
ConsumerSession = null;
}
IsConnected = false;
}
public void Publish(string message)
{
Producer.Send(ProducerSession.CreateTextMessage(message));
}
public void Publish(string[] messages)
{
foreach (string msg in messages)
Publish(msg);
}
public void ConnectionExceptionHandler(Exception ex)
{
Disconnect(); // Clean up
if (ConnectionError != null)
ConnectionError(ex);
}
#region IDisposable Members
private bool disposed;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
Disconnect();
disposed = true;
}
}
#endregion
}