0

我在 java 中有 RabbitMQ 队列使用者应用程序。下面是声明队列的代码片段:

public static void declareQueue(final String rmqQueueName) {
    try {
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", 10);
        channel.queueDeclare(rmqQueueName, true, false, false, args);
    } catch (final IOException e) {
        log.warn("Could not declare the queue: " + rmqQueueName + " ", e);
    }
}

消费者代码:

public void consumeBrokerMessage(final String rmqQueueName) throws IOException {

    declareQueue(rmqQueueName);
    log.info(" [*] Waiting for messages. To exit press CTRL+C");
    final Consumer consumer = new DefaultConsumer(channel) {

        @Override
        public void handleDelivery(
                final String consumerTag,
                final Envelope envelope,
                final AMQP.BasicProperties properties,
                final byte[] body) throws IOException {
            boolean flag = false;
            final String brokerMessage = new String(body, StandardCharsets.UTF_8);
            final long start = System.currentTimeMillis();
            final OffsetDateTime utcStartDateTime = OffsetDateTime.now(ZoneOffset.UTC);
            final String startDateTime =
                utcStartDateTime.format(
                    DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"));
            flag = BrokerMessage.handleBrokerMessage(utility);

            if (flag) {
                log.info("Broker Message has processed");
            } else {
                log.warn("Error while processing broker message");
            }
            final long deliveryTag = envelope.getDeliveryTag();
            // positively acknowledge a single delivery, the message will
            // be discarded
            channel.basicAck(deliveryTag, false);
        }
    };
    channel.basicQos(1);
    channel.basicConsume(rmqQueueName, false, consumer);
}

================

public static Channel createChannel(
        final String rmqHost,
        final String rmqPort,
        final String rmqUser,
        final String rmqPassword,
        final String rmqVHost) {

    final ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(rmqHost);
    factory.setPort(Integer.parseInt(rmqPort));
    factory.setUsername(rmqUser);
    factory.setPassword(rmqPassword);
    factory.setVirtualHost(rmqVHost);
    **factory.setAutomaticRecoveryEnabled(true);**

    final Connection connection;
    Channel rmqChannel = null;
    try {
        connection = factory.newConnection();
        rmqChannel = connection.createChannel();
    } catch (final IOException | TimeoutException e) {
        log.warn("Could not establish rabbitmq connection. ", e);
    }

    return rmqChannel;
}

当我启动我的应用程序时,以最大优先级 10 创建队列。但是当我从 RabbitMQ 管理中删除队列时,队列不会自行恢复并且不会自动重新声明。有谁知道如何在不重新启动应用程序的情况下自动恢复和重新声明队列?

4

0 回答 0