0

我正在使用 RocketMQ 并希望从队列中获取每个新请求的消息

@Service
public class GetMessageFromQueue extends BaseObject {
@Resource
private RocketMQTemplate rocketMQTemplate;

@Value("${demo.rocketmq.topic}")
private String springTopic;

private String ms;

public void getMessage(InternalRequest internalRequest) throws MyExeption {
    logger.info("sending message='{}'", inputData.getData());

    CountDownLatch loginLatch = new CountDownLatch (1);
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(inputData.getCorrID());
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    try {
        //get message by TAG to filter only messsage I want
        consumer.subscribe(springTopic, inputData.getCorrID());
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                logger.info("consumeMessage " + msgs.size());
                for (MessageExt messageExt : msgs) {
                    logger.info("consumeMessage " + new String(messageExt.getBody()));
                    ms = new String(messageExt.getBody());
                }
                loginLatch.countDown ();
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    } catch (MQClientException e) {
        e.printStackTrace();
        logger.error("e " + e);
    }

    try {
        loginLatch.await ();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    consumer.shutdown();

    logger.info("ms " + ms);
    if(!inputData.getCorrID().equals(ms)) {
        throw new MyExeption ("500",ms,ms);
    }
    logger.info("get done " + ms);
}
}

此代码在正常情况下工作正常。但是,如果我运行许多并发线程,有时 ms varrable 并不能准确地获得来自 MessageListenerConcurrently 回调的值。

我哪里错了?我如何在这里等待来自消费者的结果?

4

1 回答 1

0

就我而言,这个计划实际上很奇怪。让我指出一些错误的东西,以便您找出实际问题。

  1. 您有一个初始值为 1 的锁存器。消费者是来自消费者线程池的消费消息,消息可能很多,这意味着消费者可以并行消费 2 条两条消息,比如 msg1 和 msg2 ,变量可以被赋值为 msg1 或 msg2 ,这是不确定的。

  2. 每次调用您的方法时都会创建消费者并关闭。所以消费者偏移量可能没有及时更新到broker,你的消费者关闭了,这意味着下次你用同一个消费者组启动你的消费者时,会再次消费相同的消息。

  3. 消费者设置为 CONSUME_FROM_FIRST_OFFSET,这真的是你想要的吗?这意味着如果有 10 条消息已经发送到主题,您可能总是在新消费者(具有新消费者组)启动时消费第一个

于 2020-04-30T02:36:57.390 回答