0

请先看看我的代码。

这是我的测试类,它正在创建 2000 个线程并且这些线程正在发送消息。

public class MessageSenderMultipleThreadMock {
    @Autowired
    MessageList message;
    @Autowired
    MessageSender sender;

    public boolean process() throws InterruptedException {

        for (int i = 0; i < 2000; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {

                    String routingkey = "operation"
                            + UUID.randomUUID().toString();
                    String queueName = UUID.randomUUID().toString();

                    message.setSender(Thread.currentThread().getName());
                    try {
                        sender.sendMessage(routingkey, queueName,
                                "this is message");
                    } catch (InvalidMessagingParameters e) {
                        e.printStackTrace();
                    }

                }
            }).start();
            Thread.sleep(1000);

        }
        Thread.currentThread();
        Thread.sleep(10000);
        return true;
    }
}

消息发送者

这是我的主要消息发送者类

    @Service
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MessageList message;
    String queueName = "";
    String routingKey = "";
    @Autowired
    private QueueCreationService service;
    private boolean messageSentFlag;
    String returnedMessage = "";
    private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());

    public boolean sendMessage(String routingKey, String queueName,
            String messageToBeSent) throws InvalidMessagingParameters {
        if ((routingKey == null && queueName == null)
                || (routingKey.equalsIgnoreCase("") || queueName
                        .equalsIgnoreCase("")))
            throw new InvalidMessagingParameters(routingKey, queueName);

        else {
            this.routingKey = routingKey;
            this.queueName = queueName;
        }
        service.processBinding(queueName, routingKey);
        message.addMessages(messageToBeSent);
        return execute();
    }

    /*
     * overloaded sendMessage method will use requestMap . RequestMap includes
     * queueName and routingKey that controller provides.
     */
    public boolean sendMessage(Map<String, String> requestMap)
            throws MessagingConnectionFailsException,
            InvalidMessagingParameters {
        this.queueName = requestMap.get("queue");
        this.routingKey = requestMap.get("routingkey");
        if ((routingKey == null && queueName == null)
                || (routingKey.equalsIgnoreCase("") || queueName
                        .equalsIgnoreCase("")))
            throw new InvalidMessagingParameters(routingKey, queueName);
        service.processBinding(queueName, routingKey);
        preparingMessagingTemplate();
        return execute();
    }

    private boolean execute() {
        for (int i = 0; i < 5 && !messageSentFlag; i++) {
            executeMessageSending();
        }
        return messageSentFlag;
    }

    private String convertMessageToJson(MessageList message) {
        ObjectWriter ow = new ObjectMapper().writer()
                .withDefaultPrettyPrinter();
        String json = "";
        try {
            json = ow.writeValueAsString(message);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return json;
    }

    private void executeMessageSending() {
        rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
                convertMessageToJson(message), new CorrelationData(UUID
                        .randomUUID().toString()));

    }

    private void preparingMessagingTemplate() {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode,
                    String replyText, String exchange, String routingKey) {
                returnedMessage = replyText;
            }
        });
        rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack,
                    String cause) {
                System.out.println("*" + ack);

                if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
                    messageSentFlag = ack;
                    log.info("message " + message.toString()
                            + " from Operation +" + this.getClass().getName()
                            + "+  has been successfully delivered");
                } else {
                    log.info("message " + message.toString()
                            + " from Operation +" + this.getClass().getName()
                            + "+ has not been delivered");

                }
            }
        });
    }
}

消息传递使用的我的配置类

    @Configuration
    @ComponentScan("com.alpharaid.orange.*")
    @PropertySource("classpath:application.properties")

public class MessageConfiguration {

    String content = "";
    @Value("${rabbitmq_host}")
    String host = "";
    String port = "";
    @Value("${rabbitmq_username}")
    String userName = "";
    @Value("${rabbitmq_password}")
    String password = "";
    String queueName = "";
    InputStream input = null;

    @Autowired
    public MessageConfiguration() {
    }

    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    @Bean
    @Scope("prototype")
    public QueueCreationService service() {
        return new QueueCreationService();
    }

    @Bean
    @Scope("prototype")
    public RabbitAdmin admin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                this.host);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

}

我的问题:

  1. 正如我在服务器上看到的,一些线程正在成功传递消息,而其他线程则没有。

  2. rabbitTemplate 监听器完全不确定(

    rabbitTemplate.setReturnCallback(new ReturnCallback() {

我需要听众每次都工作,因为在此基础上我会尝试再次发送消息

    private boolean execute() {
    for (int i = 0; i < 5 && !messageSentFlag; i++) {
        executeMessageSending();
    }
    return messageSentFlag;
}

我可以看到有时消息被传递了 5 次,因为messageSentFlag是错误的,并且只有在 Confirm 侦听器中才会变为 true。

  1. 请告诉我如何删除队列?因为我有 8000 个队列,我在 rabbitAdmin 中看到了一种删除队列的方法,但它需要队列的名称,而我的队列只是任何随机队列(UUID)

请提供您的想法,我该如何改进它或者有什么解决方法?对于我的应用程序,多线程环境是必须的。

提前致谢。

4

1 回答 1

2

RabbitMQ 仅在消息在特定队列中时才保证消息顺序。

不保证向 RabbitMQ 发送消息的消息顺序,除非您将这些保证落实到位。在许多情况下,即使不是不可能,这也是一件困难的事情——尤其是在像您这样的多线程环境中。

如果您需要保证消息按特定顺序处理,则需要考虑构建或使用重排序器

一般的想法是,您需要在源处为消息编号 - 1、2、3、4、5 等。当您的消费者将消息拉出队列时,您将查看消息编号,看看这是否是一个你现在需要的。如果不是,您将保留该消息并稍后处理它。获得当前正在查找的消息 # 后,您将按顺序处理当前持有的所有消息。

spring 应该有类似 resequencer 的东西可用,尽管我对该生态系统不够熟悉,无法为您指明正确的方向。

于 2015-08-14T12:36:58.227 回答