1

问题:我们有 2 或 3 个应用程序实例。每个实例都有一个生产者和一个消费者。我们必须安排一些流程,为此我们使用常见的 spring 调度程序。这个调度程序产生消息并将它们扔给“代理”(RabbitMQ)。在我们的例子中,我们处理相同的数据 2 或 3 次,因为每个实例都会抛出消息。在第一个生产者抛出消息之前,您将如何阻止实例的生产者?

配置:

<!-- RabbitMQ configuration -->
<rabbit:connection-factory
        id="connection" host="${rabbit.host}" port="${rabbit.port}"      username="${rabbit.username}" password="${rabbit.password}"
        channel-cache-size="${rabbit.publisherCacheSize}" virtual-host="${rabbit.virtualHost}" />

<!-- Declare executor pool for worker threads -->
<!-- Ensure that the pool-size is greater than the sum of all number of concurrent consumers from rabbit that use this pool to ensure
     you have enough threads for maximum concurrency. We do this by ensuring that this is 1 plus the size of the connection factory cache
     size for all consumers -->
<task:executor id="worker-pool" keep-alive="60" pool-size="${rabbit.consumerChannelCacheSize}" queue-capacity="1000" rejection-policy="CALLER_RUNS"/>


<!-- Message converter -->
<bean id="baseMessageConverter" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound" value="com.company.model.Scraper"/>
</bean>

<bean id="messageConverter" class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
    <constructor-arg index="0" ref="baseMessageConverter"/>
</bean>


<!-- *********************************producer*********************************** -->
<!-- Outbound company Events -->
<int:channel id="producerChannelCompany"/>
<int:gateway id="jobcompanyCompleteEventGateway" service-interface="com.company.eventing.companyEventPublisher"
             default-request-channel="producerChannelCompany"
             default-request-timeout="2000"
             error-channel="errors"/>

<amqp:outbound-channel-adapter id="companyEvents.amqpAdapter" channel="producerChannelCompany"
                               exchange-name="${rabbit.queue.topic}"
                               routing-key="${rabbit.queue.routing.key}"
                               amqp-template="psRabbitTemplate"/>

<rabbit:admin id="psRabbitAdmin" connection-factory="connection" />
<rabbit:template id="psRabbitTemplate" channel-transacted="${rabbit.channelTransacted}" encoding="UTF-8" message-converter="messageConverter" connection-factory="connection"/>
<rabbit:topic-exchange id="ps.topic" name="${rabbit.queue.topic}" durable="true" auto-delete="false"/>



<!-- *********************************consumer*********************************** -->
<rabbit:queue id="ps.queue" name="${rabbit.queue}"  auto-delete="false" durable="true" exclusive="false"  />


<!-- Exchange to queue binding -->
<rabbit:topic-exchange id="ps.topic" name="${rabbit.queue.topic}" durable="true" auto-delete="false" >
    <rabbit:bindings>
        <rabbit:binding queue="${rabbit.queue}" pattern="${rabbit.queue.pattern}"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

<!-- Configuration for consuming company Complete events -->
<amqp:inbound-channel-adapter id="companyAdapter"
                              channel="companyCompleteEventChannel"
                              queue-names="${rabbit.queue}"
                              channel-transacted="${rabbit.channelTransacted}"
                              prefetch-count="${rabbit.prefetchCount}"
                              concurrent-consumers="${rabbit.concurrentConsumers}"
                              connection-factory="connection"
                              message-converter="messageConverter"
                              task-executor="worker-pool"
                              error-channel="errors"/>



<int:channel id="companyCompleteEventChannel"/>
<int:service-activator id="companyCompleteActivator" input-channel="companyCompleteEventChannel"
                       ref="companyEventHandler" method="runScraper"/>

<bean id="jvmLauncher" class="com.app.company.jvm.JvmLauncher" />
<!-- company Event handler -->
<bean id="companyEventHandler" class="com.app.company.eventing.consumer.companyEventHandler" depends-on="jvmLauncher">
    <!--<property name="scriptHelper" ref="scriptHelper"/>-->
    <property name="jvmLauncher" ref="jvmLauncher" />
    <property name="defaultMemoryOptions" value="${company.memory.opts}"/>
    <property name="defaultMemoryRegex" value="${company.memory.regex}"/>
</bean>


<!-- ERRORS -->
<int:channel id="errors"/>
<int:service-activator id="psErrorLogger" input-channel="errors" ref="psloggingHandler"/>

<bean id="psloggingHandler" class="org.springframework.integration.handler.LoggingHandler">
    <constructor-arg index="0" value="DEBUG"></constructor-arg>
    <!-- <property name="loggerName" value="com.app.travelerpayments.loggingHandler"/> -->
</bean>
4

1 回答 1

0

目前尚不清楚您拥有什么架构,但如果您的所有实例都使用来自同一个队列的消息,则每条消息将仅被使用一次(除非消费者重新排队)。我猜这是在您的情况下使用 AMQP 功能的最佳方式。如果我错过了什么,请澄清你的问题。

使用 a-la fanout 消息传递,当每个实例都有自己的队列和自己的消息堆栈并且您想自己控制消息传递(当然,在几乎所有情况下这都是坏主意),为什么不让所有实例听个人队列绑定到扇出交换并将此交换用于控制消息。您可以告诉实例何时停止或开始消费、刷新队列、安排重启等。

请注意,您还可以通过特定的路由键使用主题交换和绑定队列,例如“control.*”

这个想法是发送who is free请求,选择随机的免费工作人员并向其发送有效负载。您可以使用特定的路由键或仅将有效负载发布到路由键与队列名称相同的默认交换(默认情况下,队列绑定到路由键与队列名称相同的默认交换,请参阅RabbitMQ 文档中的默认交换部分)。

于 2013-07-18T18:11:46.553 回答