0

我已经定义了这样的承诺......

    const result = await Promise.race([
      new Promise(resolve => {
        consumer.run({
          eachMessage: ({ message }) => {
            const data = JSON.parse(message.value.toString());
            if (data.payload.template
              && data.payload.template.id === '...'
              && data.payload.to[0].email === email) {
              console.log('Should resolve!')
              resolve(data.payload.template.variables.link);
              console.log('resolved');

              consumer.pause();
              consumer.disconnect();
            }
          },
        });
      }),
      new Promise((_, reject) => setTimeout(reject, 3000))
    ]);
    console.log('result is ', result);
    return result;

我可以解决,但它最后没有打印结果,似乎超时和实际承诺都没有按预期工作?这是为什么?我怀疑它与在 kafka js 回调中使用 resolve 有关?


更新:似乎它Promise.race()没有解决,但为什么呢?

4

1 回答 1

1

我怀疑您的“成功方面”承诺无意中抛出,而您正在默默地吞下错误。

使用模型的最小实现consumer(成功或失败 50/50),以下代码有效。

运行代码示例几次以查看这两种情况。

var consumer = {
  interval: null,
  counter: 0,
  run: function (config) {
    this.interval = setInterval(() => {
      this.counter++;
      console.log(`Consumer: message #${this.counter}`);
      config.eachMessage({message: this.counter});
    }, 250);
  },
  pause: function () {
    console.log('Consumer: paused');
    clearInterval(this.interval);
  },
  disconnect: function () {
    console.log('Consumer: disconnected');    
  }
};

Promise.race([
  new Promise(resolve => {
    const expectedMsg = Math.random() < 0.5 ? 3 : 4;
    consumer.run({
      eachMessage: ({ message }) => {
        if (message === expectedMsg) resolve("success");
      }
    });
  }),
  new Promise((_, reject) => setTimeout(() => {
    reject('timeout');
    consumer.pause();
    consumer.disconnect();
  }, 1000))
]).then((result) => {
  console.log(`Result: ${result}`);
}).catch((err) => {
  console.log(`ERROR: ${err}`);
});

我也转移consumer.pause()consumer.disconnect()了“超时端”的承诺,这样消费者就可以保证断开连接,尽管在成功的情况下它可能运行的时间比必要的长一点。

于 2021-05-15T10:10:17.427 回答