0

嗨,我正在与 Mule Any Point Studio 合作。我想定义一个队列名称,并从该队列中使用 AMQP 读取数据。它没有从我提到的队列中轮询数据。

我的骡流: <amqp:connector name="amqpConnector" doc:name="AMQP Connector" host="localhost" port="5672" username="admin" password="admin" validateConnections="true" ></amqp:connector> <flow name="mule-ampq" doc:name="mule-ampq"> <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="newx" queueAutoDelete="true" connector-ref="amqpConnector" doc:name="AMQP" exchangeType="fanout" responseTimeout="10000"/>
<logger message="#[message.payload]" level="INFO" doc:name="Logger"/>
</flow>

我收到以下错误:

ERROR 2014-10-16 15:54:44,452 [main] org.mule.module.launcher.DefaultArchiveDeployer: ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + Failed to deploy artifact 'mule-ampq', see below + ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ org.mule.module.launcher.DeploymentStartException: EOFException: at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:143) at org.mule.module.launcher.artifact.ArtifactWrapper$4.execute(ArtifactWrapper.java:98) at org.mule.module.launcher.artifact.ArtifactWrapper.executeWithinArtifactClassLoader(ArtifactWrapper.java:129) at org.mule.module.launcher.artifact.ArtifactWrapper.start(ArtifactWrapper.java:93) at org.mule.module.launcher.DefaultArtifactDeployer.deploy(DefaultArtifactDeployer.java:26) at org.mule.module.launcher.DefaultArchiveDeployer.guardedDeploy(DefaultArchiveDeployer.java:274) at org.mule.module.launcher.DefaultArchiveDeployer.deployArtifact(DefaultArchiveDeployer.java:294) at org.mule.module.launcher.DefaultArchiveDeployer.deployExplodedApp(DefaultArchiveDeployer.java:261) at org.mule.module.launcher.DefaultArchiveDeployer.deployExplodedArtifact(DefaultArchiveDeployer.java:110) at org.mule.module.launcher.DeploymentDirectoryWatcher.deployExplodedApps(DeploymentDirectoryWatcher.java:287) at org.mule.module.launcher.DeploymentDirectoryWatcher.start(DeploymentDirectoryWatcher.java:148) at org.mule.tooling.server.application.ApplicationDeployer.main(ApplicationDeployer.java:130) Caused by: org.mule.retry.RetryPolicyExhaustedException: null at org.mule.retry.policies.AbstractPolicyTemplate.execute(AbstractPolicyTemplate.java:101) at org.mule.transport.AbstractConnector.connect(AbstractConnector.java:1621) at org.mule.transport.AbstractConnector.start(AbstractConnector.java:424) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.mule.lifecycle.phases.DefaultLifecyclePhase.applyLifecycle(DefaultLifecyclePhase.java:237) at org.mule.lifecycle.RegistryLifecycleManager$RegistryLifecycleCallback.onTransition(RegistryLifecycleManager.java:273) at org.mule.lifecycle.RegistryLifecycleManager.invokePhase(RegistryLifecycleManager.java:152) at org.mule.lifecycle.RegistryLifecycleManager.fireLifecycle(RegistryLifecycleManager.java:123) at org.mule.registry.AbstractRegistryBroker.fireLifecycle(AbstractRegistryBroker.java:76) at org.mule.registry.MuleRegistryHelper.fireLifecycle(MuleRegistryHelper.java:136) at org.mule.lifecycle.MuleContextLifecycleManager$MuleContextLifecycleCallback.onTransition(MuleContextLifecycleManager.java:91) at org.mule.lifecycle.MuleContextLifecycleManager$MuleContextLifecycleCallback.onTransition(MuleContextLifecycleManager.java:87) at org.mule.lifecycle.MuleContextLifecycleManager.invokePhase(MuleContextLifecycleManager.java:69) at org.mule.lifecycle.MuleContextLifecycleManager.fireLifecycle(MuleContextLifecycleManager.java:61) at org.mule.DefaultMuleContext.start(DefaultMuleContext.java:278) at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:123) ... 11 more Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:107) at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:259) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:383) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:403) at org.mule.transport.amqp.AmqpConnector.connectToFirstResponsiveBroker(AmqpConnector.java:443) at org.mule.transport.amqp.AmqpConnector.doConnect(AmqpConnector.java:365) at org.mule.transport.AbstractConnector$5.doWork(AbstractConnector.java:1561) at org.mule.retry.policies.AbstractPolicyTemplate.execute(AbstractPolicyTemplate.java:63) ... 29 more Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:328) at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:244) ... 35 more Caused by: java.io.EOFException at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:104) at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:141) at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:402) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:430) INFO 2014-10-16 15:54:44,455 [main] org.mule.module.launcher.DeploymentDirectoryWatcher: ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + Mule is up and kicking (every 5000ms) + ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

4

1 回答 1

0

试试这个例子......它包含 AMQP 使用的不同的不同交换类型......对于直接消息传递,您不需要在 RabitMQ 中显式创建队列......它将自动创建:-

<http:connector name="HttpConnector" doc:name="HTTP\HTTPS"/>

    <amqp:connector name="amqpConnector" activeDeclarationsOnly="true" ackMode="MULE_AUTO" doc:name="AMQP Connector"/>

    <amqp:connector name="amqpConnectorManualAck" prefetchCount="1"  ackMode="MANUAL" doc:name="AMQP Connector"/>

    <amqp:connector name="mandatoryAmqpConnector" mandatory="true" immediate="true" doc:name="AMQP Connector"/>


   <!-- Direct Messaging -->
   <amqp:connector name="amqp_config" validateConnections="true" virtualHost="/"  username="guest" password="guest" doc:name="AMQP Connector"/>
   <amqp:endpoint exchangeName="directEx" queueName="directQ" routingKey="routing.key" exchangeType="direct" queueDurable="true" name="amqp_direct_endpoint" responseTimeout="10000" doc:name="AMQP"/>
<!--  Direct Messaging -->  


    <jbossts:transaction-manager doc:name="Transaction Manager">  
        <property key="com.arjuna.ats.arjuna.coordinator.defaultTimeout" value="600"></property>  
        <property key="com.arjuna.ats.arjuna.coordinator.txReaperTimeout" value="1000000"></property> 
    </jbossts:transaction-manager>


    <flow name="DefaultSender" doc:name="DefaultSender" >
    <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="1080" path="orders" doc:name="/orders" doc:description="Process HTTP reqests or responses." connector-ref="HttpConnector"/>
    <set-payload value="New Message for Flow1" doc:name="Set Payload"/>
     <logger message="Sending Message to Queue inhouseOrder .. Payload is #[message.payload]" level="INFO" category="DefaultSender" doc:name="Payload Logger" />
    <amqp:outbound-endpoint exchange-pattern="request-response"  exchangeName="directEx"   exchangeType="direct" queueDurable="true" queueName="inhouseOrder" connector-ref="amqpConnector" doc:name="Dispatch to inhouseOrder" />
      <byte-array-to-object-transformer doc:name="Byte Array to Object"/>
    </flow>


   <flow name="DefaultReceiver" doc:name="inhouseOrder" processingStrategy="synchronous" >
        <amqp:inbound-endpoint queueName="inhouseOrder" connector-ref="amqpConnector" exchangeName="directEx"  exchangeType="direct" queueDurable="true" doc:name="inhouseOrder" >
           <amqp:transaction action="ALWAYS_BEGIN" recoverStrategy="REQUEUE" />
        </amqp:inbound-endpoint>
       <byte-array-to-object-transformer doc:name="Byte Array to Object"/>
        <logger message="Receiving Message to Queue inhouseOrder .. Payload is #[message.payload]"  level="INFO" category="DefaultReceiver" doc:name="Payload Logger" />
  </flow> 



   <flow name="FanoutSenderExample2" doc:name="FanoutSenderExample2">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="1080" path="orders3" doc:name="/orders" doc:description="Process HTTP reqests or responses." connector-ref="HttpConnector"/>
        <set-payload value="Fanout Message for Queue accounting" doc:name="Set Payload"/>
     <logger message="Sending Payload in FanoutSenderExample2 #[message.payload]" level="INFO" category="FanoutSenderExample2" doc:name="Payload Logger" />
     <amqp:outbound-endpoint exchangeName="back-end-processing" exchangeType="fanout" exchangeAutoDelete="false" exchangeDurable="true" queueDurable="true" queueExclusive="false" queueAutoDelete="false" exchange-pattern="one-way" connector-ref="amqpConnector" doc:name="Dispatch to back-end-processing" />
     <byte-array-to-object-transformer doc:name="Byte Array to Object"/>
  </flow>


     <flow name="FanoutReceiverExample2" doc:name="FanoutReceiverExample2">
        <amqp:inbound-endpoint exchangeName="back-end-processing" queueName="accounting" exchangeType="fanout" exchangeAutoDelete="false" exchangeDurable="true" queueDurable="true" queueExclusive="false" queueAutoDelete="false" connector-ref="amqpConnector" doc:name="back-end-processing fullfilment queue" />
        <byte-array-to-object-transformer doc:name="Byte Array to Object"/>
        <logger message="Payload received in FanoutReceiverExample2 is:  #[payload]"    level="INFO" category="FanoutReceiverExample2" doc:name="Payload Logger" />
    </flow>

   <!-- Direct Messaging -->
   <flow name="Send_Message_Direct" doc:name="Send_Message_Direct">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="1080" doc:name="HTTP" path="orders5"/>
        <set-payload value="#['im a Direct message'.getBytes()]" doc:name="Set payload for amqp message as ByteArray"/>
        <amqp:outbound-endpoint responseTimeout="10000"  doc:name="Send Direct Message" connector-ref="amqp_config" ref="amqp_direct_endpoint"/>
        <set-payload value="#['Message Sended']" doc:name="Set payload as String"/>
        <logger message="Direct message sended" level="INFO" doc:name="Logger"/>
    </flow>

    <flow name="Recive_Message_Direct" doc:name="Recive_Message_Direct">
        <amqp:inbound-endpoint responseTimeout="10000"  doc:name="Recive Direct Message" connector-ref="amqp_config" ref="amqp_direct_endpoint"/>
        <byte-array-to-string-transformer doc:name="Transform bytearray message to String"/>
        <logger message="I recived a direct message from AMQP: #[payload]" level="INFO" doc:name="Logger"/>
    </flow>

    <sub-flow name="defaultErrorHandler" doc:name="defaultErrorHandler">
        <logger message="Error occurred: #[payload]" level="INFO" doc:name="Log Error"/>
        <smtp:outbound-endpoint host="localhost"  responseTimeout="10000" doc:name="Send Email to Operations"/>
    </sub-flow>

</mule>

参考:- 参考:- https://github.com/mulesoft/mule-transport-amqp/blob/master/GUIDE.md#mule-amqp-transport---user-guide

于 2014-10-16T12:21:41.713 回答