0

我有一个应用程序对另一个系统进行 API 调用,它使用 Bull 和 Redis 将这些 API 调用排入队列。

但是,有时它会因大量 API 调用而陷入困境,或者某些东西停止正常工作,我希望用户可以通过一种简单的方法来检查系统是否只是“忙碌”。(否则,如果他们执行了某些操作,但 10 分钟后还没有完成,他们会继续尝试,然后我们会积压更多条目(在某些情况下,他们发布了重复部分的数据问题, ETC。)

以下是队列中成功调用 API 的单个“密钥”的样子:

HSET "bull:webApi:4822" "timestamp" "1639085540683"
HSET "bull:webApi:4822" "returnvalue" "{"id":"e1df8bb4-fb6c-41ad-ba62-774fe64b7882","workOrderNumber":"WO309967","status":"success"}"
HSET "bull:webApi:4822" "processedOn" "1639085623027"
HSET "bull:webApi:4822" "data" "{"id":"e1df8bb4-fb6c-41ad-ba62-774fe64b7882","token":"eyJ0eXAiOiJKV1QiL....dQVyEpXt64Fznudfg","workOrder":{"members":{"lShopFloorLoad":true,"origStartDate":"2021-12-09T00:00:00","origRequiredQty":2,"requiredQty":2,"requiredDate":"2021-12-09T00:00:00","origRequiredDate":"2021-12-09T00:00:00","statusCode":"Released","imaItemName":"Solid Pin - Black","startDate":"2021-12-09T00:00:00","reference":"HS790022053","itemId":"13840402"}},"socketId":"3b9gejTZjAXsnEITAAvB","type":"Create WO"}"
HSET "bull:webApi:4822" "delay" "0"
HSET "bull:webApi:4822" "priority" "0"
HSET "bull:webApi:4822" "name" "__default__"
HSET "bull:webApi:4822" "opts" "{"lifo":true,"attempts":1,"delay":0,"timestamp":1639085540683}"
HSET "bull:webApi:4822" "finishedOn" "1639085623934"

您可以看到在这种情况下处理需要 83 秒。(1639085540 - 1639085623)

我希望能够提供汇总指标,例如:

  • 最近的 API 调用在 X 秒前被添加到队列中
  • 最近一次成功的 API 调用在 X 秒前完成,需要 XX 秒才能完成。

我还希望能够提供 50 个最近的 API 调用的列表,以一种很好的方式格式化并标记为“成功”、“待定”或“失败”。

我对 Redis 和 Bull 还很陌生,我试图弄清楚如何查询这些数据(在 Node.js 中使用 Redis)并将这些数据作为 JSON 返回给应用程序。

我可以拉一个这样的键列表:

// @route   GET /status
async function status(req, res) {
  const client = createClient({
    url: `redis://${REDIS_SERVER}:6379`
  });
  try {
      client.on('error', (err) => console.log('Redis Client Error', err));
      await client.connect();
      const value = await client.keys('*');
      res.json(value)

  } catch (error) {
      console.log('ERROR getting status: ', error.message, new Date())
      res.status(500).json({ message: error.message })
  } finally {
    client.quit()
  }
}

这将返回 ["bull:webApi:3","bull:webApi:1","bull:webApi:2"...]

但是我怎样才能拉出与各个键关联的值呢?

以及如何找到数字最高的密钥,然后拉出“最后 50 个”的详细信息。在 SQL 中,这就像执行ORDER BY key_number DESC LIMIT 50 - 但我不确定如何在 Redis 中执行此操作。

4

2 回答 2

1

所以我想出了如何提取我需要的数据。我并不是说这是一个好方法,我愿意接受建议;但它似乎可以在不改变队列功能的情况下提供过滤后的 JSON 返回所需的数据。

这是它的样子:

// @route   GET /status/:listNum
async function status(req, res) {
  const { listNum = 10} = req.params
  const client = createClient({
    url: `redis://${REDIS_SERVER}:6379`
  });
  try {
      client.on('error', (err) => console.log('Redis Client Error', err));
      await client.connect();
      // Find size of queue database
      const total_keys = await client.sendCommand(['DBSIZE']);
      const upper_value = total_keys;
      const lower_value = total_keys - listNum;
      // Generate array
      const range = (start, stop) => Array.from({ length: (start - stop) + 1}, (_, i) => start - (i));
      var queue_ids = range(upper_value, lower_value)
      queue_ids = queue_ids.filter(function(x){ return x > 0 }); // Filer out anything less than zero
      // Current timestamp in seconds
      const current_timestamp = parseInt(String(new Date().getTime()).slice(0, -3)); // remove microseconds ("now")
      var response = []; // Initialize array
      for(id of queue_ids){ // Loop through queries
        // Query value
        var value = await client.HGETALL('bull:webApi:'+id);
        if(Object.keys(value).length !== 0){ // if returned a value
          // Grab most of the request (exclude the token & socketId to save space, not used)
          var request_data = JSON.parse(value.data)
          request_data.token = '';
          request_data.socketId = '';
          // Grab & calculate desired times
          const processedOn = value.processedOn.slice(0, -3); // remove microseconds ("start")
          const finishedOn = value.finishedOn.slice(0, -3); // remove microseconds ("done")
          const duration = finishedOn - processedOn; // (seconds)
          const elapsedSinceStart = current_timestamp - processedOn;
          const elapsedSinceFinished = current_timestamp - finishedOn;
          // Grab the returnValue
          const return_data = value.returnValue;
          // ignoring queue keys of: opts, priority, delay, name, timestamp
          const object_data = {request_data: request_data, processedOn: processedOn, finishedOn: finishedOn, return_data: return_data, duration: duration, elapsedSinceStart: elapsedSinceStart, elapsedSinceFinished: elapsedSinceFinished }
          response.push(object_data);
        }
      }
      
      res.json(response);

  } catch (error) {
      console.log('ERROR getting status: ', error.message, new Date());
      res.status(500).json({ message: error.message });
  } finally {
    client.quit();
  }
}

它正在循环 Redis 查询,所以我不想将它用于数百个键,但对于 10 个甚至 50 个键,我认为它应该可以工作。

现在我已经求助于获取键的总数并向后工作:

await client.sendCommand(['DBSIZE']);

在我的情况下,它会返回一个比最高键 id 略高的总数(〜一把状态键),但至少接近,然后我只是过滤掉任何不响应。

我看过 ZRANGE 了一下,但我不知道如何让它给我最后一个 id。当我有这样的 Redis 数据库(Bull Queue)时:

雷迪斯

如果我可以运行一个简单的 Redis 命令,它会返回“3”,我可能会使用它。(因为公牛:webApi:3 的数字最高)

(在实际用例中,这可能是 9555 或某个较高的数字;我只想获取存在的编号最高的密钥。)

现在,我将尝试使用上面提出的方法。

于 2021-12-10T21:16:44.680 回答
0

我在这里有点晚了,但是如果您不打算在 Redis 中手动挖掘,我认为 Bull 的 API 尤其是Queue#getJobs()这里有您需要的一切,并且应该容易使用。通常,您不必进入 Redis 来执行任何此类常见任务,这就是 Bull 的用途!

如果我正确理解您的目标,您应该能够执行以下操作:

const Queue = require('bull')

async function status (req, res) {
  const { listNum = 10 } = req.params
  const api_queue = new Queue('webApi', `redis://${REDIS_SERVER}:6379`)
  const current_timestamp_sec = new Date().getTime() / 1000 // convert to seconds

  const recent_jobs = await api_queue.getJobs(null, 0, listNum)
  const results = recent_jobs.map(job => {
    const processed_on_sec = job.processedOn / 1000
    const finished_on_sec = job.finshedOn / 1000
    return {
      request_data: job.data,
      return_data: job.returnValue,
      processedOn: processed_on_sec,
      finishedOn: finished_on_sec,
      duration: finished_on_sec - processed_on_sec,
      elapsedSinceStart: current_timestamp_sec - processed_on_sec,
      elapsedSinceFinished: current_timestamp_sec - finished_on_sec
    }
  })
  res.json(results)
}

这将为您提供队列中最新的numList* 工作。我还没有测试过这个完整的代码,我将把错误处理和自定义字段添加到工作数据中留给你,但它的核心是可靠的,我认为这应该可以满足你的需求,而不必想想 Bull 如何在 Redis 中存储东西。

我还提供了关于如何更好地处理时间戳的建议,您无需进行字符串处理即可将毫秒转换为秒。如果您需要它们是整数,您可以将它们包装在Math.floor().

*无论如何,至少有那么多 - 请参阅下面的第二个注释


几点注意事项:

  • 的第一个参数getJobs()是状态列表,因此如果您想查看刚刚完成的作业,您可以通过['completed']、或已完成和活动、做['completed', 'active']等。如果未提供列表 ( null),则默认为所有状态。
  • 正如我链接的参考资料中提到的,这里的限制是每个州的- 所以你可能会得到比listNum工作更多的回报。对于您的用例来说,这似乎不是问题,但如果是,您可以对返回的列表进行排序(可能按作业 ID)并只返回第一个listNum- 您可以保证至少得到那么多(假设您的队列中有很多工作),并且不会超过6*listNum(因为有 6 个状态)。
  • Bull 的新手可能会对实例化 Queue 对象来做这样的事情感到紧张——但不要这样!Queue 实例本身不做任何事情,它只是给定队列的接口。process()在您调用添加处理器之前,它不会开始处理作业。顺便说一句,这也是您从一个单独的进程添加作业而不是在其中运行队列的方式,但当然,除非您调用add().
于 2021-12-21T23:13:02.023 回答