我是RabbitMQ的新手。我需要一个 MOM 系统来实现这些目的:
- 在我的逻辑成功执行之前,会使用已发布的消息。
- 在我的逻辑成功执行之前,代理不必从队列中删除已发布的消息。
对于这些目标,我在第一次尝试时编写了以下代码:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException
{
try
{
channel.txSelect();
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recieve Message is :" + new String(body));
int reslt = //execute my logic
if(result == 0)
channel.txCommit();
else
channel.txRollback();
}
catch(Throwable t)
{
t.printStackTrace();
}
}
});
通过这种方法,我达到了第二个目的,换句话说,代理不会删除我的消息,但是有一次队列中的所有消息都被消费并全部回滚,代理不会再次向我的消费者发送消息.
在第二次尝试时,我编写了以下代码:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException
{
try
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recieve Message is :" + new String(body));
int reslt = //execute my logic
if(result == 0)
channel.basicAck(deliveryTag, false);
else
channel.basicNack(deliveryTag,false,true);
}
catch(Throwable t)
{
t.printStackTrace();
}
}
});
通过这个解决方案,我实现了这两个目标,但我不知道我的代码是否正确?该方法是否会在高 TPS 的生产环境中引起问题?不知道basicNack方法的requeue flag是重还是轻?