1

我正在尝试将 WSO2 ESB (4.7.0) 与 WSO2 Message Broker (2.1.0) 集成。

这是我的用例:

  1. 通用 HTTP 客户端将 REST 请求发送到部署在 WSO2 ESB 上的 PassThrough 代理
  2. ESB 代理有一个外序列:将请求转发到真正的 REST 服务,然后,在外序列中,它将响应发送到中介类(部署在 WSO2 ESB 内部)
  3. mediator 类做了一些东西,里面有这个方法,通过它可以将事件发布到 Message Broker 上的主题:

代码:

private void publishEvent(){        
    String topicName = "MyEvent";

    Properties properties = new Properties();
    TopicConnection topicConnection = null;

    properties.put("java.naming.factory.initial", "org.wso2.andes.jndi.PropertiesFileInitialContextFactory");


    String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'";

    properties.put("connectionfactory.QueueConnectionFactory", connectionString);

    try {

        InitialContext ctx =  new InitialContext(properties);
        TopicConnectionFactory tcf = (TopicConnectionFactory) ctx.lookup("QueueConnectionFactory");
        TopicConnection connection = tcf.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic(topicName);
        TopicPublisher publisher= session.createPublisher(topic);
        TextMessage textMessage =
             session.createTextMessage("<asd>sono il publisher di WSO2 message Broker!</asd>");

        publisher.publish(textMessage);
     publisher.close();

    }catch (Exception e){

        e.printStackTrace();
    }
 }
  1. 在另一端,有一个“MyEvent”主题的订阅者已经在运行。

(我从这个 URL 获取了发布者和订阅者的代码:http ://wso2.com/library/articles/2011/12/wso2-esb-example-pubsub-soa/ )

当客户端将 REST 请求发送到代理 (1) 时,调解器被正确调用,但当它尝试执行 PublishEvent 方法 (3) 时,没有任何反应,并且 WSO2 ESB 记录此 530 错误:

INFO - ConnectionCloseMethodHandler ConnectionClose frame received
[2013-10-17 12:43:47,733]  INFO - ConnectionCloseMethodHandler Error :530: not      allowed:Thread-33
[2013-10-17 12:43:47,734] ERROR - AMQStateManager No Waiters for error saving as last  error:Attempt to redeclare exchange: amq.topic of type topic to null.
[2013-10-17 12:43:47,735] ERROR - AMQConnection Throwable Received but no listener set: org.wso2.andes.client.AMQAuthenticationException: Attempt to redeclare exchange: amq.topic of type topic to null. [error code 530: not allowed]
ERROR - AMQStateManager No Waiters for error saving as last error:Attempt to redeclare exchange: amq.topic of type topic to null.
[2013-10-17 20:53:44,996] ERROR - AMQConnection error:
org.wso2.andes.client.AMQAuthenticationException: Attempt to redeclare exchange: amq.topic of type topic to null. [error code 530: not allowed]
at     org.wso2.andes.client.handler.ConnectionCloseMethodHandler.methodReceived(ConnectionCloseMethodHandler.java:79)
at     org.wso2.andes.client.handler.ClientMethodDispatcherImpl.dispatchConnectionClose(ClientMethodDispatcherImpl.java:192)
at     org.wso2.andes.framing.amqp_0_91.ConnectionCloseBodyImpl.execute(ConnectionCloseBodyImpl.java:140)
at     org.wso2.andes.client.state.AMQStateManager.methodReceived(AMQStateManager.java:111)
at     org.wso2.andes.client.protocol.AMQProtocolHandler.methodBodyReceived(AMQProtocolHandler.java:515)
at org.wso2.andes.client.protocol.AMQProtocolSession.methodFrameReceived(AMQProtocolSession.java:456)
at org.wso2.andes.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:96)
at org.wso2.andes.client.protocol.AMQProtocolHandler$2.run(AMQProtocolHandler.java:466)
at org.wso2.andes.pool.Job.processAll(Job.java:109)
at org.wso2.andes.pool.Job.run(Job.java:157)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
javax.jms.JMSException: Error closing connection:     org.wso2.andes.client.AMQAuthenticationException: Attempt to redeclare exchange: amq.topic of type topic to null. [error code 530: not allowed]
at org.wso2.andes.client.AMQConnection.doClose(AMQConnection.java:920)
at org.wso2.andes.client.AMQConnection.close(AMQConnection.java:855)
at org.wso2.andes.client.AMQConnection.close(AMQConnection.java:846)
at org.wso2.andes.client.AMQConnection.close(AMQConnection.java:841)
at innova.esb.mediator.TopicPublisher.publishMessage(TopicPublisher.java:49)
at     innova.esb.mediator.MediatorEventPublisher.mediate(MediatorEventPublisher.java:35)
at org.apache.synapse.mediators.ext.ClassMediator.mediate(ClassMediator.java:78)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:114)
at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.injectMessage(Axis2SynapseEnvironment.java:239)
at org.apache.synapse.core.axis2.SynapseCallbackReceiver.handleMessage(SynapseCallbackReceiver.java:443)
at org.apache.synapse.core.axis2.SynapseCallbackReceiver.receive(SynapseCallbackReceiver.java:166)
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
at org.apache.synapse.transport.passthru.ClientWorker.run(ClientWorker.java:222)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Caused by: org.wso2.andes.client.AMQAuthenticationException: Attempt to redeclare exchange: amq.topic of type topic to null. [error code 530: not allowed]
at org.wso2.andes.client.handler.ConnectionCloseMethodHandler.methodReceived(ConnectionCloseMethodHandler.java:79)
at org.wso2.andes.client.handler.ClientMethodDispatcherImpl.dispatchConnectionClose(ClientMethodDispatcherImpl.java:192)
at org.wso2.andes.framing.amqp_0_91.ConnectionCloseBodyImpl.execute(ConnectionCloseBodyImpl.java:140)
at org.wso2.andes.client.state.AMQStateManager.methodReceived(AMQStateManager.java:111)
at org.wso2.andes.client.protocol.AMQProtocolHandler.methodBodyReceived(AMQProtocolHandler.java:515)
at org.wso2.andes.client.protocol.AMQProtocolSession.methodFrameReceived(AMQProtocolSession.java:456)
at org.wso2.andes.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:96)
at org.wso2.andes.client.protocol.AMQProtocolHandler$2.run(AMQProtocolHandler.java:466)
at org.wso2.andes.pool.Job.processAll(Job.java:109)
at org.wso2.andes.pool.Job.run(Job.java:157)

显然订阅者没有捕捉到任何事件。

怎么了?为什么我有这个连接异常?

非常感谢。

4

0 回答 0