1

使用持久连接侦听消息队列时,侦听器出现错误。我通过点击CTRL-Z退出程序来模拟这一点。尝试重新连接给我一个错误,上面写着:

on_error! : "javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: <Client-id> already connected from tcp://10.18.57.69:4241
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.security.JaasAuthenticationBroker.addConnection(JaasAuthenticationBroker.java:75)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker.addConnection(AbstractRuntimeConfigurationBroker.java:118)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103)
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:333)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:197)
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
        at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:774)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:265)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
        at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:70)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.lang.Thread.run(Thread.java:745)
"

我尝试使用以下方法取消订阅并断开我的连接,但它不会断开我的连接。

class MyListener(stomp.ConnectionListener):
    """This is a listener class that listens for new messages using the STOMP protocol"""

    def __init__(self, conn):
        self.conn = conn
        self.client_id = client_id

    def on_error(self, headers, message):
        self.disconnect()

    def on_message(self, headers, message):
        ...

    def on_disconnected(self):
        self.disconnect()

    def disconnect(self):
        try:
            self.conn.unsubscribe(
                destination="/topic/bmrsTopic",
                id=self.client_id
            )
        except:
            print('unsubscribe failed')
        # first disconnect before trying to reconnect
        print('first disconnect before trying to reconnect')
        try:
            self.conn.disconnect()
        except:
            print('disconnect failed')

如何强制 AMQ 服务器忘记我之前的连接?

4

1 回答 1

1

您不能让代理断开其他客户端资源与其他连接的连接。相反,您应该为客户端和代理配置一个空闲超时值的连接,以便双方在没有套接字检测到关闭的情况下处理远程丢弃。

您还可以为不宣传心跳的 stomp 客户端配置带有心跳宽限期值的代理传输连接器,请参阅 ActiveMQ 代理 STOMP文档

STOMP 规范概述了心跳的工作原理:

于 2020-01-06T21:44:49.503 回答