根据您的描述,我了解以下内容:
- 你有一个 Node.js 前端。每个 Node.js 框需要限制为最多 100 个客户端
- 你有一个未定义的后端,它与前端的盒子有 GRPC 连接(我们称它们为 EAP)。每个 EAP <-> Node.js GRPS 链接仅限于 N 个并发连接。
我在这里看到的只是服务器级别和连接级别的限制,因此我认为没有理由让任何分布式系统(如 Bull)来管理队列(如果 Node.js 框死了,则没有人能够恢复 HTTP 请求提供对该特定请求的响应的上下文 - 因此,当 Node.js 框死亡时对其请求的响应不再有用)。
考虑到这一点,我只需创建一个本地队列(就像数组一样简单)来管理您的队列。
免责声明:这必须被视为伪代码,以下是简化且未经测试的
这可能是一个队列实现:
interface SimpleQueueObject<Req, Res> {
req: Req;
then: (Res) => void;
catch: (any) => void;
}
class SimpleQueue<Req = any, Res = any> {
constructor(
protected size: number = 100,
/** async function to be executed when a request is de-queued */
protected execute: (req: Req) => Promise<Res>,
/** an optional function that may ba used to indicate a request is
not yet ready to be de-queued. In such case nex request will be attempted */
protected ready?: (req: Req) => boolean,
) { }
_queue: SimpleQueueObject<Req, Res>[] = [];
_running: number = 0;
private _dispatch() {
// Queues all available
while (this._running < this.size && this._queue.length > 0) {
// Accept
let obj;
if (this.ready) {
const ix = this._queue.findIndex(o => this.ready(o.req));
// todo : this may cause queue to stall (for now we throw error)
if (ix === -1) return;
obj = this._queue.splice(ix, 1)[0];
} else {
obj = this._queue.pop();
}
// Execute
this.execute(obj.req)
// Resolves the main request
.then(obj.then)
.catch(obj.catch)
// Attempts to queue something else after an outcome from EAP
.finally(() => {
this._running --;
this._dispatch();
});
obj.running = true;
this._running ++;
}
}
/** Queue a request, fail if queue is busy */
queue(req: Req): Promise<Res> {
if (this._running >= this.size) {
throw "Queue is busy";
}
// Queue up
return new Promise<Res>((resolve, reject) => {
this._queue.push({ req, then: resolve, catch: reject });
this._dispatch();
});
}
/** Queue a request (even if busy), but wait a maximum time
* for the request to be de-queued */
queueTimeout(req: Req, maxWait: number): Promise<Res> {
return new Promise<Res>((resolve, reject) => {
const obj: SimpleQueueObject<Req, Res> = { req, then: resolve, catch: reject };
// Expire if not started after maxWait
const _t = setTimeout(() => {
const ix = this._queue.indexOf(obj);
if (ix !== -1) {
this._queue.splice(ix, 1);
reject("Request expired");
}
}, maxWait);
// todo : clear timeout
// Queue up
this._queue.push(obj);
this._dispatch();
})
}
isBusy(): boolean {
return this._running >= this.size;
}
}
然后您的 Node.js 业务逻辑可能会执行以下操作:
const EAP1: SimpleQueue = /* ... */;
const EAP2: SimpleQueue = /* ... */;
const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
100,
// Forward request to EAP
async req => {
if (req.forEap1) {
// Example 1: this will fail if EAP1 is busy
return EAP1.queue(req);
} else if (req.forEap2) {
// Example 2: this will fail if EAP2 is busy and the request can not
// be queued within 200ms
return EAP2.queueTimeout(req, 200);
}
}
)
app.get('/', function (req, res) {
// Forward request to ingress queue
INGRESS.queue(req)
.then(r => res.status(200).send(r))
.catch(e => res.status(400).send(e));
})
或者此解决方案将允许您(根据要求)也接受对繁忙 EAP 的请求(最多总共 100 个)并在它们准备好时分派它们:
const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
100,
// Forward request to EAP
async req => {
if (req.forEap1) {
return EAP1.queue(req);
} else if (req.forEap2) {
return EAP2.queue(req);
}
},
// Delay queue for busy consumers
req => {
if (req.forEap1) {
return !EAP1.isBusy();
} else if (req.forEap2) {
return !EAP2.isBusy();
} else {
return true;
}
}
)
请注意:
- 在这个例子中,Node.js 将在收到超过 100 个并发请求时开始抛出(在节流时抛出 503 并不罕见)
- 当您有更多节流限制(在您的情况下为 Node.js 和 GRPC)时要小心,因为第一个可能会导致秒数不足(考虑接收 100 个 EAP1 请求,然后接收 10 个 EAP2 请求,Node.js 将充满 EAP1 请求和将拒绝 EAP2 都做 EAP2 什么都不做)