我正在使用 RocketMQ 并希望从队列中获取每个新请求的消息
@Service
public class GetMessageFromQueue extends BaseObject {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${demo.rocketmq.topic}")
private String springTopic;
private String ms;
public void getMessage(InternalRequest internalRequest) throws MyExeption {
logger.info("sending message='{}'", inputData.getData());
CountDownLatch loginLatch = new CountDownLatch (1);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(inputData.getCorrID());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
//get message by TAG to filter only messsage I want
consumer.subscribe(springTopic, inputData.getCorrID());
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
logger.info("consumeMessage " + msgs.size());
for (MessageExt messageExt : msgs) {
logger.info("consumeMessage " + new String(messageExt.getBody()));
ms = new String(messageExt.getBody());
}
loginLatch.countDown ();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
logger.error("e " + e);
}
try {
loginLatch.await ();
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.shutdown();
logger.info("ms " + ms);
if(!inputData.getCorrID().equals(ms)) {
throw new MyExeption ("500",ms,ms);
}
logger.info("get done " + ms);
}
}
此代码在正常情况下工作正常。但是,如果我运行许多并发线程,有时 ms varrable 并不能准确地获得来自 MessageListenerConcurrently 回调的值。
我哪里错了?我如何在这里等待来自消费者的结果?