我正在尝试制作一个通过 AMQP 与 ActiveMQ 集成的 Apache Camel 应用程序。
我一直在使用提供的“ camel-example-spring-jms ”项目,该项目通过标准 TCP 连接,但我已经修改为使用我的独立 ActiveMQ 5.8 安装(而不是嵌入式),我使用 TCP 可以正常工作.
活动 MQ 配置(5672 上的 amqp)
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61610?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
</transportConnectors>
在“ camel-server.xml ”中,我已将现有的“jms”“ActiveMQComponent”替换为“JmsComponent”,该“JmsComponent”引用了“AMQConnectionFactory”,我在其上指定了我的连接 URL(尝试了以下两种变体)。
amqp://guest:guest@localhost/test?brokerlist='tcp://localhost:5672'
amqp://guest:guest@/?brokerlist='tcp://localhost:5672'
<bean id="jmsConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
<constructor-arg index="0"
value="amqp://guest:guest@localhost/test?brokerlist='tcp://localhost:5672'" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="useMessageIDAsCorrelationID" value="true" />
</bean>
使用上面的配置,服务器似乎可以正常启动,但是当我将路由添加到“ ServerRoutes.java ”中的 amqp 队列时,启动时出现错误。
from("amqp:queue:numbers").to("multiplier");
Camel Server 窗口中的错误是:
[nsumer[numbers]] INFO AMQConnection - to broker at tcp://localhost:5672
org.apache.qpid.AMQException: Cannot connect to broker: connect() aborted [error code 200: reply success]
我的 ActiveMQ 窗口中的错误是:
org.apache.activemq.transport.amqp.AmqpProtocolException: Could not decode AMQP frame: hex: 414d51500101000a
Caused by: org.apache.qpid.proton.engine.TransportException: AMQP header mismatch value 1, expecting 0
任何帮助都可以诊断此问题。
谢谢。