0

最后改写

NodeJS 通过 GRPC 与其他 API 通信。

每个外部 API 都有自己与 Node 的专用 GRPC 连接,并且每个专用 GRPC 连接都有一个可以同时服务的并发客户端的上限(例如,外部 API 1 的上限为 30 个用户)。

对 Express API 的每个请求,可能需要与External API 1External API 2External API 3(从现在开始,EAP1EAP2等)进行通信,并且 Express API 还具有并发客户端的上限(例如 100 个客户端) ) 可以为 EAP 提供食物。

所以,我是如何考虑解决这个问题的:

  1. 客户端向 Express API 发出新请求。

  2. 中间件queueManager为客户端创建 Ticket(将其视为批准访问系统的 Ticket - 它具有客户端的基本数据(例如名称))

  3. 客户端获取票证,创建一个事件侦听器,该事件侦听器以他们的票证 ID 作为事件名称来侦听事件(当系统准备好接受票证时,它会产生票证的 ID 作为事件)并进入一个“大厅”,其中,客户,只需等待他们的票证 ID 被接受/宣布(事件)。

我的问题是我真的想不出如何实现系统跟踪票证的方式以及如何根据系统的并发客户端建立一个队列。

在此处输入图像描述

在授予客户端访问系统的权限之前,系统本身应该:

  1. 检查 Express API 是否已达到其并发客户端的上限 -> 如果是这样,它应该等待新的 Ticket 位置可用
  2. 如果有新职位可用,它应该检查工单并找出它需要联系的 API。例如,如果它需要联系EAP1,它应该检查当前有多少客户端使用 GRPC 连接。这已经实现(每个外部 API 都在一个包含所有所需信息的类下)。如果EAP1已达到其上限,那么 NodeJS 应该稍后再试一次(但是,要晚多少?我应该在系统完成对 EAP1 的另一个请求后发出系统事件吗?)

我知道Bull,但我不确定它是否符合我的要求。我真正需要做的是让客户排队,并且:

  1. 检查 Express API 是否已达到其并发用户上限
  • 如果一个位置是空闲的,从 Ticket 的数组中 pop() 一个 Ticket
  1. 检查 EAPx 是否已达到其并发用户上限
  • 如果为真,请尝试另一个需要与不同 EAP 通信的票证(如果有)
  • 如果为 false,则授予访问权限

编辑:另一个想法可能是拥有两个公牛队列。一种用于 Express API(其中选项“并发”可以设置为 Express API 的上限),另一种用于 EAP。每个 EAP 队列将有一个不同的工作人员(以设置上限)。

改写

为了更好地描述这个问题,我将尝试重新表述需求。

系统的简单视图可能是: 在此处输入图像描述

我使用了 Clem 的建议(RabbitMQ),但同样,我无法通过限制(上限)实现并发。

所以,

  1. 客户要求从TicketHandler. 为了让 TicketHandler 构造一个新的 Ticket,客户端连同其他信息一起提供callback
TicketHandler.getTicket(variousInfo, function () {
    next();
  })

系统将使用回调来允许客户端与 EAP 连接。

  1. TickerHandler 获取票证:

    i) 将其添加到队列中

    ii) 当票证可以被访问时(没有达到上限),它会询问相应的 EAP Handler 客户端是否可以使用 GRPC 连接。如果是,则要求 EAP 处理程序锁定一个位置,然后它调用工单的可用回调(来自步骤 1) 如果不是,TicketHandler 检查需要联系不同 EAP 的下一个可用工单。这应该一直持续到第一个通知 TicketHandler“没有可用位置”的 EAP 处理程序向 TicketHandler 发送消息以通知它“现在有 X 个可用位置”(或“1 个可用位置”)。然后 TicketHandler,应该检查之前无法访问 EAPx 的票证,并再次询问 EAPx 是否可以访问 GRPC 连接。

4

1 回答 1

0

根据您的描述,我了解以下内容:

  • 你有一个 Node.js 前端。每个 Node.js 框需要限制为最多 100 个客户端
  • 你有一个未定义的后端,它与前端的盒子有 GRPC 连接(我们称它们为 EAP)。每个 EAP <-> Node.js GRPS 链接仅限于 N 个并发连接。

我在这里看到的只是服务器级别和连接级别的限制,因此我认为没有理由让任何分布式系统(如 Bull)来管理队列(如果 Node.js 框死了,则没有人能够恢复 HTTP 请求提供对该特定请求的响应的上下文 - 因此,当 Node.js 框死亡时对其请求的响应不再有用)。

考虑到这一点,我只需创建一个本地队列(就像数组一样简单)来管理您的队列。

免责声明:这必须被视为伪代码,以下是简化且未经测试的

这可能是一个队列实现:

interface SimpleQueueObject<Req, Res> {
  req: Req;
  then: (Res) => void;
  catch: (any) => void;
}


class SimpleQueue<Req = any, Res = any> {

  constructor(
    protected size: number = 100,
    /** async function to be executed when a request is de-queued */
    protected execute: (req: Req) => Promise<Res>,
    /** an optional function that may ba used to indicate a request is
     not yet ready to be de-queued. In such case nex request will be attempted */
    protected ready?: (req: Req) => boolean,
  ) { }

  _queue: SimpleQueueObject<Req, Res>[] = [];
  _running: number = 0;

  private _dispatch() {
    // Queues all available
    while (this._running < this.size && this._queue.length > 0) {
      // Accept
      let obj;
      if (this.ready) {
        const ix = this._queue.findIndex(o => this.ready(o.req));
        // todo : this may cause queue to stall (for now we throw error)
        if (ix === -1) return;
        obj = this._queue.splice(ix, 1)[0];
      } else {
        obj = this._queue.pop();
      }
      // Execute
      this.execute(obj.req)
        // Resolves the main request
        .then(obj.then)
        .catch(obj.catch)
        // Attempts to queue something else after an outcome from EAP
        .finally(() => {
          this._running --;
          this._dispatch();
        });
      obj.running = true;
      this._running ++;
    }
  }

  /** Queue a request, fail if queue is busy */
  queue(req: Req): Promise<Res> {
    if (this._running >= this.size) {
      throw "Queue is busy";
    }

    // Queue up
    return new Promise<Res>((resolve, reject) => {
      this._queue.push({ req, then: resolve, catch: reject });
      this._dispatch();
    });
  }

  /** Queue a request (even if busy), but wait a maximum time
   * for the request to be de-queued */
  queueTimeout(req: Req, maxWait: number): Promise<Res> {
    return new Promise<Res>((resolve, reject) => {
      const obj: SimpleQueueObject<Req, Res> = { req, then: resolve, catch: reject };
      // Expire if not started after maxWait
      const _t = setTimeout(() => {
        const ix = this._queue.indexOf(obj);
        if (ix !== -1) {
          this._queue.splice(ix, 1);
          reject("Request expired");
        }
      }, maxWait);
      // todo : clear timeout
      // Queue up
      this._queue.push(obj);
      this._dispatch();
    })
  }

  isBusy(): boolean {
    return this._running >= this.size;
  }

}

然后您的 Node.js 业务逻辑可能会执行以下操作:

const EAP1: SimpleQueue = /* ... */;
const EAP2: SimpleQueue = /* ... */;

const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
  100,
  // Forward request to EAP
  async req => {
    if (req.forEap1) {
      // Example 1: this will fail if EAP1 is busy
      return EAP1.queue(req);
    } else if (req.forEap2) {
      // Example 2: this will fail if EAP2 is busy and the request can not
      // be queued within 200ms
      return EAP2.queueTimeout(req, 200);
    }
  }
)

app.get('/', function (req, res) {
  // Forward request to ingress queue
  INGRESS.queue(req)
    .then(r => res.status(200).send(r))
    .catch(e => res.status(400).send(e));
})

或者此解决方案将允许您(根据要求)也接受对繁忙 EAP 的请求(最多总共 100 个)并在它们准备好时分派它们:

const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
  100,
  // Forward request to EAP
  async req => {
    if (req.forEap1) {
      return EAP1.queue(req);
    } else if (req.forEap2) {
      return EAP2.queue(req);
    }
  },
  // Delay queue for busy consumers
  req => {
    if (req.forEap1) {
      return !EAP1.isBusy();
    } else if (req.forEap2) {
      return !EAP2.isBusy();
    } else {
      return true;
    }
  }
)

请注意:

  • 在这个例子中,Node.js 将在收到超过 100 个并发请求时开始抛出(在节流时抛出 503 并不罕见)
  • 当您有更多节流限制(在您的情况下为 Node.js 和 GRPC)时要小心,因为第一个可能会导致秒数不足(考虑接收 100 个 EAP1 请求,然后接收 10 个 EAP2 请求,Node.js 将充满 EAP1 请求和将拒绝 EAP2 都做 EAP2 什么都不做)
于 2021-09-20T17:16:32.647 回答