我们有 2 台服务器:server1 和 server2。两台服务器都在 Wildfly 11 的域模式配置下运行。这是我们如何配置两台服务器的文档。
在我们的例子中,让我们考虑 server1 域节点。
问题:
如果 2 条具有相同组 ID 的消息同时到达 server1 和 server2,他们将不知道应该将消息发送给哪个消费者。因此,消息最终会被不同的消费者处理,有时首先到达的消息会被稍后处理,这是不可取的。我们想配置系统,使两个节点彼此知道消息应该由哪个消费者处理。
我们尝试过的解决方案:
使用组处理程序 LOCAL 配置 server1,使用 REMOTE 配置 server2。现在,每当消息到达时,本地组处理程序会识别该特定组 ID 的使用者在哪个节点上,并相应地选择消息。
此解决方案在 server1 运行正常之前有效。但是,如果 server1 出现故障,则不会处理消息。为了解决这个问题,我们将 server1 的消息传递子系统 active-mq 添加到 server2 的备份,并对 server2 进行了同样的操作。
/profile=abc/subsystem=messaging-activemq/server=backup:add
(备份服务器将添加到两个节点,因为 server1 是域节点)
我们还向这个备份服务器添加了相同的发现组、http 连接器、广播组。我们为 server1 和 server2 建立了备份和实时服务器的集群连接,使其位于同一组中。
/profile=abc/subsystem=messaging-activemq/server=default/ha-policy=replication-master:add(cluster-name=my-cluster,group-name=${livegroup},check-for-live-server=true)
/profile=abc/subsystem=messaging-activemq/server=backup/ha-policy=replication-slave:add(cluster-name=my-cluster,group-name=${backupgroup})
server1 配置为读取以下属性:
livegroup=group1
backupgroup=group2
server2 配置为读取以下属性:
livegroup=group2
backupgroup=group1
但是,此解决方案似乎无法修复故障转移条件,并且当具有本地组处理程序节点的活动节点关闭时,未在其他节点上处理消息。当 server1 关闭时,我们在 server2 上收到以下错误:
[org.apache.activemq.artemis.core.server] (default I/O-3) AMQ222092: Connection to the backup node failed, removing replication now: ActiveMQRemoteDisconnectException[errorType=REMOTE_DISCONNECT message=null]
at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:533)
at org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:682)
at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.channelInactive(ActiveMQChannelHandler.java:79)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:360)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:325)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1329)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:908)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:744)
at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612)
at org.xnio.nio.WorkerThread.run(WorkerThread.java:479)
请建议任何其他方法来处理该问题,或者我们如何配置具有 LOCAL 组处理程序的服务器关闭的场景。