问题标签 [bullmq]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
385 浏览

node.js - 我需要在bulmq 中实现启动和停止工作队列功能

我需要使用bulmq 实现一项功能来启动和停止工作队列。

完整的场景是当有人按下前端的开始按钮时。作业将被添加到工作人员中,队列将启动并处理它。在工作人员执行过程中,如果有人按下停止按钮。它需要停止或可以说暂停队列作业。然后如果有人会重新开始工作。它应该以相同的队列作业重新开始。

我真的很感激任何帮助,在此先感谢。

0 投票
0 回答
12 浏览

bullmq - 通过 queue.add 中传递的数据识别可重复的作业

queue.add()如果我理解正确,可通过传递选项来创建可重复的作业,repeat例如此处所述。

但是,在调用时,不会返回getRepeatableJobs传递给的所有数据。queue.add()相反,我只得到密钥、名称、ID 等。

在我的用例中,我正在创建一个可重复的作业,同时type在作业数据中传递一个字段。如果我想取消该类型的所有可重复作业,我将如何去做呢?到目前为止,我能想到的唯一方法是获取所有延迟的作业,过滤具有正确类型的作业,然后检索所有可重复的作业并“以某种方式”将它们的键/ID 与延迟的作业相关联。

有没有更好的方法来实现这一目标?如果不是,将延迟的工作与其可重复的工作关联起来最安全的方法是什么?它是基于 MD5 的getRepeatJobId吗?

0 投票
1 回答
599 浏览

node.js - 删除bulmq中特定的可重复作业(使用jobid)

我需要从队列中删除特定的个人作业。实际情况是我已将作业添加到队列中,然后在某些 API 调用中我需要从队列中删除该单个作业。所以,以后不会再重复了。

添加:

消除:

其中 request.jobKey 是 add 中的 job.id。

0 投票
0 回答
459 浏览

node.js - Heroku 不会连接到 redis 去 [ioredis] 未处理的错误事件:错误:连接 ECONNREFUSED 127.0.0.1:6379

我正在使用 Bullmq 和 redistogo 创建一个队列。它在本地运行良好,但是当我部署到 heroku 时,我不断收到如下所示的错误。有什么明显的我做错了吗?

我已经按照以下文档设置了 redistogo:https ://devcenter.heroku.com/articles/redistogo

我已经包含了 Procfile,它显示了 worker dyno 通过调用 index.js 开始。

在日志中,我包含了 REDISTOGO_URL 以表明它正在通过。

档案:

index.js

worker.js

处理器.js

Heroku 日志中的错误:

0 投票
0 回答
64 浏览

bullmq - 将bulmq 流用于有条件的作业

我正在使用 BullMQ从日历样式 API 获取数据。有一个调用来获取多天的数据,但有时几天无法返回数据,所以我需要回退到每天的 API 调用,然后最后对整个结果集执行一周的操作。我想避免有条件地添加新工作(安排在缺失的日子里获取数据),而是在开始时建立整个流程,然后如果每周的电话得到他们的数据,则跳过所有的每日电话,然后在流程的根本工作。

所以我会运行“get-whole-week”工作(作为某事的孩子),然后如果缺少天数,则运行 7 个相应的日常工作。也就是说,我会删除这些工作在我获得数据的日子里。但是如何将其映射为父子关系呢?除了作为父子之外,似乎没有办法添加要按顺序运行的多个作业,但是如果我将每日调用和每周调用添加为兄弟姐妹,那么所有作业都将同时运行,并且如果我已经有了他们的数据,就无法有条件地跳过工作。

0 投票
0 回答
56 浏览

node.js - How to achieve 100% Round Robin with BullMQ?

We are using BullMQ to process our orders.

We really need each worker to be called exactly once per cycle. so if for example we have 20 incoming orders while 10 connected workers, each worker should process exactly 2 orders etc.

Right now, the round robin seems a little bit broken, it honestly seems random instead of RoundRobin, am I missing something?

Thank you in advance.

0 投票
1 回答
137 浏览

nestjs - NestJS Bull Log 错误到 Sentry

我最近将 Bull 添加到我的项目中,以卸载诸如将文档同步到 3rd 方服务之类的事情,并且一切都运行良好,除了处理作业时发生的错误不会最终出现在 Sentry 中。他们只登录作业本身,但由于我们在多个配置上运行我们的应用程序,这意味着我必须不断监视所有这些实例以查找作业处理错误。

我知道我可以向处理器添加错误处理程序,但我已经有很多处理器了,所以我更喜欢另一个更全局的解决方案

有什么方法可以确保这些错误也发送给 Sentry?

0 投票
2 回答
420 浏览

node.js - Bullmq 不会通过 TLS 从 Redis 中提取工作

我正在使用来自https://www.npmjs.com/package/bullmqconst queue = new Queue('Paint')页面的非常基本的示例 ( ) - 一切正常(默认为)。localhost:6379

但是,当我添加new Queue('Paint', { connection })基于对 Redis ( rediss://...) 的 TLS 访问的连接 () 时,我仍然可以将作业推送到队列中(我在 Redis 本身中看到),但是这些作业都不会被工作人员拉取。

也许我错过了一些隐藏的标志?

谢谢!

-Dror

0 投票
1 回答
600 浏览

node.js - Bull MQ 可重复作业未触发

这个问题与这个线程在 Bull 的给定 cron 时间没有被触发的可重复作业继续

我也面临同样的问题。我应该如何指定时区?我试图指定为重复:{ cron: '* 7 14 * * *', tz: 'Europe/Berlin'}

意思是在德国时区 14:07 触发作业。虽然作业列在队列中,但未触发作业。

我也试过重复:{ cron: '* 50 15 * * *', offset: datetime.getTimezoneOffset(), tz: 'Europe/Berlin'}

0 投票
1 回答
363 浏览

node.js - NodeJS Express API - 票务/队列系统

最后改写

NodeJS 通过 GRPC 与其他 API 通信。

每个外部 API 都有自己与 Node 的专用 GRPC 连接,并且每个专用 GRPC 连接都有一个可以同时服务的并发客户端的上限(例如,外部 API 1 的上限为 30 个用户)。

对 Express API 的每个请求,可能需要与External API 1External API 2External API 3(从现在开始,EAP1EAP2等)进行通信,并且 Express API 还具有并发客户端的上限(例如 100 个客户端) ) 可以为 EAP 提供食物。

所以,我是如何考虑解决这个问题的:

  1. 客户端向 Express API 发出新请求。

  2. 中间件queueManager为客户端创建 Ticket(将其视为批准访问系统的 Ticket - 它具有客户端的基本数据(例如名称))

  3. 客户端获取票证,创建一个事件侦听器,该事件侦听器以他们的票证 ID 作为事件名称来侦听事件(当系统准备好接受票证时,它会产生票证的 ID 作为事件)并进入一个“大厅”,其中,客户,只需等待他们的票证 ID 被接受/宣布(事件)。

我的问题是我真的想不出如何实现系统跟踪票证的方式以及如何根据系统的并发客户端建立一个队列。

在此处输入图像描述

在授予客户端访问系统的权限之前,系统本身应该:

  1. 检查 Express API 是否已达到其并发客户端的上限 -> 如果是这样,它应该等待新的 Ticket 位置可用
  2. 如果有新职位可用,它应该检查工单并找出它需要联系的 API。例如,如果它需要联系EAP1,它应该检查当前有多少客户端使用 GRPC 连接。这已经实现(每个外部 API 都在一个包含所有所需信息的类下)。如果EAP1已达到其上限,那么 NodeJS 应该稍后再试一次(但是,要晚多少?我应该在系统完成对 EAP1 的另一个请求后发出系统事件吗?)

我知道Bull,但我不确定它是否符合我的要求。我真正需要做的是让客户排队,并且:

  1. 检查 Express API 是否已达到其并发用户上限
  • 如果一个位置是空闲的,从 Ticket 的数组中 pop() 一个 Ticket
  1. 检查 EAPx 是否已达到其并发用户上限
  • 如果为真,请尝试另一个需要与不同 EAP 通信的票证(如果有)
  • 如果为 false,则授予访问权限

编辑:另一个想法可能是拥有两个公牛队列。一种用于 Express API(其中选项“并发”可以设置为 Express API 的上限),另一种用于 EAP。每个 EAP 队列将有一个不同的工作人员(以设置上限)。

改写

为了更好地描述这个问题,我将尝试重新表述需求。

系统的简单视图可能是: 在此处输入图像描述

我使用了 Clem 的建议(RabbitMQ),但同样,我无法通过限制(上限)实现并发。

所以,

  1. 客户要求从TicketHandler. 为了让 TicketHandler 构造一个新的 Ticket,客户端连同其他信息一起提供callback

系统将使用回调来允许客户端与 EAP 连接。

  1. TickerHandler 获取票证:

    i) 将其添加到队列中

    ii) 当票证可以被访问时(没有达到上限),它会询问相应的 EAP Handler 客户端是否可以使用 GRPC 连接。如果是,则要求 EAP 处理程序锁定一个位置,然后它调用工单的可用回调(来自步骤 1) 如果不是,TicketHandler 检查需要联系不同 EAP 的下一个可用工单。这应该一直持续到第一个通知 TicketHandler“没有可用位置”的 EAP 处理程序向 TicketHandler 发送消息以通知它“现在有 X 个可用位置”(或“1 个可用位置”)。然后 TicketHandler,应该检查之前无法访问 EAPx 的票证,并再次询问 EAPx 是否可以访问 GRPC 连接。