3

我正在尝试做一些感觉应该很简单的事情,但事实证明它非常困难。

我有一个订阅 RabbitMQ 队列的功能。具体来说,这是 Channel.consume 函数:http ://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

它返回一个通过订阅 ID 解析的承诺 - 稍后需要取消订阅 - 并且还有一个回调参数以在消息从队列中拉出时调用。

当我想取消订阅队列时,我需要在这里使用 Channel.cancel 函数取消消费者:http ://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel 。这需要先前返回的订阅 ID。

我想将所有这些东西包装在一个 Observable 中,当订阅 observable 时订阅队列,并在取消订阅 observable 时取消订阅。然而,由于调用的“双异步”性质,这被证明有些困难(我的意思是说它们既有回调又返回一个承诺)。

理想情况下,我希望能够编写的代码是:

return new Rx.Observable(async (subscriber) => {
  var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
  return async () => {
    await channel.cancel(consumeResult.consumerTag);
  };
});

但是,这是不可能的,因为此构造函数不支持异步订阅函数或拆卸逻辑。

我一直无法弄清楚这一点。我在这里错过了什么吗?为什么这么难?

干杯,亚历克斯

4

1 回答 1

3

创建的 observable 不需要等待channel.consumepromise 解决,因为仅在您提供的函数中调用观察者(它是传递的观察者,而不是订阅者)。

但是,您返回的取消订阅功能必须等待该承诺解决。它可以在内部执行此操作,如下所示:

return new Rx.Observable((observer) => {
  var consumeResult = channel.consume(queueName, (message) => observer.next(message));
  return () => {
    consumeResult.then(() => channel.cancel(consumeResult.consumerTag));
  };
});
于 2017-04-05T23:22:18.157 回答