0

我是RabbitMQ的新手。我需要一个 MOM 系统来实现这些目的:

  1. 在我的逻辑成功执行之前,会使用已发布的消息。
  2. 在我的逻辑成功执行之前,代理不必从队列中删除已发布的消息。

对于这些目标,我在第一次尝试时编写了以下代码:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    boolean autoAck = false;
    channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
            new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)throws IOException
                {
                    try
                    {
                        channel.txSelect();
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        long deliveryTag = envelope.getDeliveryTag();
                        System.out.println("Recieve Message is :" + new String(body));
                        int reslt = //execute my logic   
                        if(result == 0)
                            channel.txCommit();
                        else
                            channel.txRollback();
                    }
                    catch(Throwable t)
                    {
                        t.printStackTrace();
                    }
                }
            });

通过这种方法,我达到了第二个目的,换句话说,代理不会删除我的消息,但是有一次队列中的所有消息都被消费并全部回滚,代理不会再次向我的消费者发送消息.

在第二次尝试时,我编写了以下代码:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    boolean autoAck = false;
    channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
            new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)throws IOException
                {
                    try
                    {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        long deliveryTag = envelope.getDeliveryTag();
                        System.out.println("Recieve Message is :" + new String(body));
                        int reslt = //execute my logic   
                        if(result == 0)
                            channel.basicAck(deliveryTag, false);
                        else
                            channel.basicNack(deliveryTag,false,true);
                    }
                    catch(Throwable t)
                    {
                        t.printStackTrace();
                    }
                }
            });

通过这个解决方案,我实现了这两个目标,但我不知道我的代码是否正确?该方法是否会在高 TPS 的生产环境中引起问题?不知道basicNack方法的requeue flag是重还是轻?

4

1 回答 1

0

几个月前我有同样的要求。我经历了许多解决方案,这对我有用。

我将交付标签存储在内存中的某个位置,如果我的逻辑顺利,我会手动确认消息或拒绝该消息。为此,我使用了以下方法。

if (success)
connectionModel.BasicAck(Convert.ToUInt64(uTag), false);
else
connectionModel.BasicReject(Convert.ToUInt64(uTag), true);

上面的代码在我的生产中运行良好,速度为 5000msg/sec。好吧,basicNack方法给我带来了问题。

于 2017-08-03T09:34:30.957 回答