解决方案是存储每个远程系统的消费者列表,并在断开连接事件触发取消功能时使用最旧的消费者标签,然后更新给定远程系统的队列消费者列表。
在远程系统连接事件上
import { Replies } from "amqplib";
// bind callback function for queue-specific messages and store returned consumer description
const result: Replies.Consume = await channel.consume(queueName, this.onSomeMessage.bind(this));
// update consumers list for the connected remote system
const consumers: Array<string> | undefined = this.consumers.get(remoteId);
if (consumers === undefined) {
const consumersList: Array<string> = new Array();
consumersList.push(result.consumerTag);
this.consumers.set(remoteId, consumersList);
} else {
consumers.push(result.consumerTag);
}
在远程系统断开事件上
// remove the oldest consumer in the list and update the list itself
// use cancel method of the amqp channel
const consumers = this.consumers.get(remoteId);
if (consumers === undefined) {
// shouldn't happen
console.error(`consumers list for ${remoteId} is empty`);
} else {
const consumerTag = consumers[0];
await this.rxchannel.addSetup(async (channel: ConfirmChannel) => {
await channel.cancel(consumerTag);
consumers.shift();
});
}
代码片段来自某些类的方法实现(如果您想知道“this”)。
版权声明(特别是对于德国同事):此答案中的代码可以在 Beerware (https://en.wikipedia.org/wiki/Beerware)或 MIT 许可(任何人喜欢的)下使用。