问题标签 [python-rq]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
514 浏览

python - 将 RethinkDB 文档推送到 RQ 队列时出现 UnpickleError

我从 RethinkDB 数据库中检索的文档中包含时间戳(表示为datetime.datetime具有特殊tzinfo值 type的 Python 对象rethinkdb.ast.RqlTzinfo

将其推送到 RQ 任务队列时,当任务尝试取消时间戳时,我在另一端收到 UnpickleError。

RQ worker 的输出:

我已经设法通过在进入队列之前用一个纪元时间替换时间戳来解决这个问题,但是这是一个权宜之计的解决方案,并且考虑到我必须处理时区等,这使得事情变得更加复杂。将纪元时间恢复到datetime对象中,然后再将其填充回数据库中。

问题出在哪里(是 RethinkDB 的RqlTzinfo对象还是 RQ 的 unpickling 实现),这是一个合法的错误还是只是我的糟糕实现?

0 投票
1 回答
1426 浏览

python - 如何将类呈现为函数?

由于之前还不清楚,我发布了这个场景:

现在在像 rq 这样的任务队列中

我想知道这个 what_function 是什么?我记得 Django 对他们的 CBV 做了类似的事情,所以我使用了这个类比,但不是很清楚。


我有一个像

我需要将它传递到任务队列,所以我可以做类似的事情

我想知道还有什么其他方法可以做到这一点,例如,Django 在他们的基于类的视图中做到这一点,

最终作为函数传递

它是如何完成的?

编辑:详细地说,django 的基于类的视图被转换为函数,因为模式函数中的参数需要一个函数。同样,您可能有一个接受以下参数的函数

但是 callback_function 的功能可能主要在一个类中实现,该类有一个 run() 方法,通过该方法运行进程。

0 投票
1 回答
1950 浏览

python - 我应该如何处理重叠批处理的 RQ 工作人员

我已经开始使用 RQ / Redis 为我的 django 站点构建一些长时间运行的作业的异步执行。我希望做以下事情:

  • 我想要一个模型的每个实例都有一个队列。你可以把这个模型想象成一个 api 用户帐户。(不会很多。最多 15 - 20 个)

  • 我将在队列中平均分配一批任务(从 10 到 500 个)。在第一批完成之前,可以添加多个批次。

  • 对于每个批次,我想为每个没有积极工作的队列启动一个工作人员,并且我想以批处理模式运行这些工作人员,这样一旦他们用完任务,他们就会关闭。

  • 我意识到我不能以批处理模式运行它们,然后我将一直在工作/监听所有队列的工作。这样做的问题是我希望能够动态地添加和删除队列,因此最好在每个批次中启动可用队列。

我意识到我在队列中分配任务似乎很奇怪,但原因是同一队列中的每个任务都必须根据我正在使用的服务进行速率限制/节流(将其视为 API速率限制,但每个队列代表一个不同的帐户)。但就我的目的而言,任务在哪个账户上运行没有区别,所以我不妨在所有账户上并行化。

我面临的问题是,如果我启动一个工作人员并给它一个已经在处理的队列,我现在有两个工作人员在该队列上独立运行,因此我的预期节流率降低了一半。如果该队列上还没有工作人员,我如何才能启动一个工作人员?我可能会找到一个 hacky 解决方案,但我更愿意以“正确”的方式处理它,因为我对队列没有太多经验,所以我想我应该问一下。

我已经在实现我自己的工人类,以便我可以动态控制队列,所以我只需要一种方法来添加逻辑,如果该队列已经在处理,它将不会被赋予新的工人。我的工人的一个简单版本在这里:

0 投票
1 回答
2170 浏览

python-rq - 如何使用 python-rq 明确地为队列赋予优先级

我正在尝试 python-rq 并且我没有看到如何明确地为队列赋予优先级?

优先级是否来自启动工作程序时定义的顺序?

与 queueB 和 queueC 相比, queueA 的优先级是多少?

0 投票
1 回答
250 浏览

fabric - 如何使用 Fabric 启动 python rq 进程

我刚刚开始使用 Fabric 来自动化我们最基本的 python 部署,我们堆栈的一部分是一个作业服务器,它使用 rq ( http://python-rq.org/ ) 作为我们在 redis 之上的队列解决方案。由于某种我无法解释的原因,rq-dashboard 和 rqworker 命令不会在结构内部启动。我可以复制/粘贴 Fabric 正在使用的确切行,它工作得很好,但在 Fabric 中失败了。

Fabric 代码如下所示:

我已经尝试了在脚本中运行 rq 命令的所有变体,等等,我能想到的,但它们都不起作用。奇怪的是,如果我确实将它们放在一个脚本中(只有这些行),并在最后包含一个“ps aux | grep rq”,我会看到 rq 进程......但是一旦 fab 脚本完成,我会检查盒子,流程都消失了。

任何帮助将不胜感激,谢谢。

0 投票
1 回答
576 浏览

python - 长期运行的 redis 作业能否让工人产生并重新排队?

作业是否有可能让工人让出并将自己放回队列的末尾?

redis 队列中的作业是按顺序处理的,长时间运行的作业可能会占用 cpu。它是否有一种模式来决定它已经消耗了足够的时间并且应该让步给队列中的其他项目?

我注意到requeue_jobrq实施中有规定;如果工作“失败”。也许这是一种破解方法?

或者也许有一个可以利用的作业超时?还是这种思维分支只是另一个死胡同?

0 投票
0 回答
353 浏览

python - 从工作函数中更新 python-rq 任务

我正在运行 python-rq 任务,这可能需要几分钟到几小时,具体取决于输入数据。

我可以通过以下方式在作业调度中设置固定超时:

当超时几乎到期时,是否可以从正在运行的任务中延长 python-rq 任务?例如,当超时为 3600 秒且任务运行 3500 秒时?

在 worker 上运行的 python 函数对 c++ 二进制文件进行子进程调用,并将其进度报告给 redis 键,因此理论上它也可以延长该循环中的工作,但我不知道该怎么做。

0 投票
1 回答
2752 浏览

python - python rq 作业队列 - time.sleep()

我正在寻找使用 RQ 运行排队作业,但查看以下示例:

我明白time.sleep(2)了 - 我想知道这是否必须指定。我安排的工作可能需要(有时)一个小时才能完成(这因每个工作而异)。

RQ 是否仍然适用于执行时间差异很大的此类工作?

任何建议都会很棒!

0 投票
1 回答
2021 浏览

python - 在 redis 作业上存储“元”数据不起作用?

我正在尝试测试排队的 redis 作业,但meta数据似乎没有在任务和发起者之间传递。job_id 似乎匹配,所以我很困惑。也许一些新鲜的眼睛可以帮助我解决问题:

任务是根据文档

rqworker日志保存后打印job_id和word

该任务是从单元测试中调用的:

并生成此日志,显示相同的 job_id 和正确的结果,但永远不会填充meta变量。word

我尝试添加很长的延迟,以便日志有机会看到任务started,但未完成状态(以防元在完成时被清除),但它没有任何区别。

知道我错过了什么吗?

0 投票
1 回答
168 浏览

mongodb - PyMongo 为每个查询发送“身份验证”

对于使用 PyMongo MongoClient 运行的每个查询,我都会获得对 MongoDb 的身份验证。这似乎很昂贵/不必要:

据我所知,我使用的是同一个 MongoClient(尽管它隐藏在 MongoEngine 后面)并且在任何时候都没有故意断开它:

当我在验证函数中设置 pdb 断点时,这是堆栈跟踪。我无法弄清楚为什么要求刷新光标需要新的身份验证。我是不是误会了,那是 MongoDb 协议的一部分?我的目标是发送尽可能少的“身份验证”命令,因为现在它们是我在服务器上记录的命令的 50%。

可能有用的其他信息是这些调用来自 Python RQ 工作程序。我正在尝试在 fork 步骤之前建立连接,但那里可能发生了一些事情导致了这种情况。