0

我目前正在评估如何管理多个 Spring 应用程序上下文(即 Tomcat 集群)之间的事件同步的一些想法。

在该用例中,每个上下文都可以产生一个事件 X(无论是Spring 上下文事件还是自制事件),必须将其广播到所有其他上下文实例。该事件仅在其“生命周期”中有效,这意味着我不想以任何方式保留它(因为它们很多,并且它们的状态将在几分钟后过时)。

我的想法是使用已经存在的 RabbitMQ 实例。但是,标准的生产者/消费者模式不适合,因为事件应该广播给所有消费者。每个消费者都是生产者……就像聊天室一样。

Q #1:这可能与 RabbitMQ(+Spring 集成)吗?如何构建这样的广播消息设置?

Q #2:这是否可能?有没有人有更好的解决方案/想法?

用例:每个 Web 应用程序上下文都可以产生诸如“用户 x 邀请用户 y”之类的事件,这些事件应该通过 Websocket 或 EventSource 或其他方式尽快传输到用户的浏览器。因为这是(已经)运行的请求,所以操作的地方(服务器 1)可能不是消费的地方(服务器 2)。

解决方案的主要目标是:

  1. (相对)快速且可扩展。
  2. 火与忘。如果消息已发送,则将其销毁。如果消息无关紧要,请忘记它。没有坚持。在 RabbitMQ 中,可以使用 TTL 来实现。
  3. 该消息正在针对模型进行序列化,即使用杰克逊或类似的东西。
  4. 无需根据(活动)上下文/节点的数量进行手动配置。如果我要添加额外的上下文(Web 服务器、后端进程等),我不想修改同步设置。

更新 1

在阅读了Gary Russell的 anwser 之后,我玩了一点。

    <beans profile="rabbit">
    <rabbit:connection-factory id="connectionFactory" channel-cache-size="10" host="${rabbitmq.host}"
                               port="${rabbitmq.port}" username="${rabbitmq.username}"
                               password="${rabbitmq.password}" virtual-host="${rabbitmq.virtualhost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue id="eventQueue" name="${rabbitmq.queue.springevents}" auto-delete="false" durable="true"></rabbit:queue>

    <bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="exchange" value="${rabbitmq.springevents.exchange.fanout}"/>
        <property name="replyTimeout" value="${rabbitmq.replyTimeout}"/>
    </bean>

    <!-- Receiving -->
    <int-amqp:inbound-channel-adapter connection-factory="connectionFactory" channel="mqEventInChannelJson"
                                      queue-names="${rabbitmq.queue.springevents}"/>

    <!-- Sending -->
    <int-amqp:outbound-channel-adapter channel="mqEventOutChannelJson" amqp-template="amqpTemplate" routing-key=""
                                       exchange-name="${rabbitmq.springevents.exchange.fanout}" />
</beans>

频道 mqEventOutChannelJson -> 兔子交换 (amqp.fanout) ->

如果我使用这种配置,多个并行启动将跳过事件,因为所有正在运行的进程都在同一个队列(rabbitmq.queue.springevents)上运行。是否有可能在不为每个节点提供不同配置的情况下创建自定义队列名称?

我已经使用单独的 Virtualhost 和交换amqp.fanout对其进行了测试。与特定的扇出交换相同。

更新 2

为了确保每个消费者都有自己的队列,我为每个消费者创建了一个唯一的应用程序 ID。

bean应用程序本身创建了一个唯一标识符:

@Component("application")
public class Application {

private String id;

@PostConstruct
public void initialize() {
    id = "app" + Math.round(1000 * Math.random());
}

public String getId() {
    return id;
}

}

鉴于此,我可以动态创建一个在公共交易所注册的唯一队列。无需外部配置步骤。

    <util:property-path id="applicationId" path="application.id" />
    <rabbit:queue id="eventQueue" name="${rabbitmq.queue.springevents}_#{applicationId}" auto-delete="true" durable="true" exclusive="true">
        <rabbit:queue-arguments>
            <!-- Attention if you want to declare mixed value types: https://jira.springsource.org/browse/AMQP-198 -->
            <entry key="x-message-ttl">
                <value type="java.lang.Long">${rabbitmq.queue.ttl}</value>
            </entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
4

1 回答 1

1

You can use a RabbitMQ topic exchange or a fanout exchange and have each consumer bind a queue to it. Completely supported by Spring AMQP and Spring Integration.

于 2013-03-29T13:06:19.503 回答