问题标签 [dramatiq]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 在 dask 或 Dramatiq 中带有 (bind=True) 的芹菜?
我使用 celery 已经有一段时间了,但由于缺乏 Windows 支持,我正在寻找替代方案。
顶级竞争对手似乎是 dask 和 Dramatiq。我真正在寻找的是可以将 1000 个长时间运行的任务分配到 10 台机器上的东西。每个人都应该在完成任务后接下一个工作,并给出一个带有更新的回调(在 celery 中,这可以通过 @task(bind=True) 很好地实现,因为可以访问任务实例本身并且我可以发送状态返回给发送更新的实例)。
Dramatiq 或 dask 中是否有类似的功能?任何建议,将不胜感激。
python - Dramatiq 不会将任务添加到队列中
我正在尝试从我的 Falcon API 方法运行一些戏剧演员,如下所示:
我的代码进入“发送”方法,通过循环没有任何问题。但是队列中没有新任务!Actor 本身没有被调用,并且我的代理中的“默认”队列是空的。如果我设置自定义队列,它仍然是空的。我的演员长这样:
经纪人在哪里
RabbitMQ 日志告诉它连接正常
我在调试器中做了一些额外的研究。它可以很好地获取消息(这意味着要发送到代理),并且Actor.send_with_options () 中的 broker.enqueue 不返回任何异常,尽管我无法真正了解它的内部逻辑。我真的不知道它为什么会失败,但肯定是RabbitmqBroker.enqueue()导致了问题。
Broker 是 Erlang 22.2.1 上的 RabbitMQ 3.8.2,使用默认设置从rabbitmq Docker Hub 映像在 Docker 中运行。Dramatiq 版本是 1.7.0。
在 RabbitMQ 日志中,只有在应用程序启动时连接到代理,而在我关闭它时断开连接,如下所示:
Broker 在__init__.py
主包中定义,在子包中导入。我不确定在所有函数的装饰器中指定相同的代理实例是否可以,但是文档中没有任何禁止它的地方。我想这没关系,因为如果我为每个 Actor 创建新的代理,它仍然不起作用。
我试图将 Redis 设置为代理,但我仍然遇到同样的问题。
这可能是什么原因?
python - 从现有的装饰器创建一个新的装饰器
我正在为我的任务队列使用Dramatiq,它提供了装饰器@dramatiq.actor
来将函数装饰为任务。我尝试编写自己的装饰器来包装@dramatiq.actor
装饰器,这样我就可以向适用于所有任务的装饰器添加一个默认参数@dramatiq.actor
(我正在谈论的参数是priority=100
)。
出于某种原因,我收到以下错误:
如果我用它切换我的自定义@task
装饰器,@dramatiq.actor
那么我猜我的自定义装饰器不正确,但我无法发现我的错误。
装饰器.py
任务.py
视图.py
dramatiq - 戏剧性工人因超出时间限制错误而关闭
当我运行一个工人时,它正在关闭并出现错误“超出时间限制”。如何增加工人的时间限制。
dramatiq - Dramatq - 消费者遇到连接错误
当我尝试从命令行运行 Dramatiq 时,它给了我以下错误。我目前正在使用 RabbitMQ。
这是我到现在为止所做的。
python-3.x - 如何让 Dramatiq 在 Windows / WSL 环境中使用 redis?
我创建了一个简单的脚本来使用 redis 测试 Dramatiq:
我正在使用此命令在 WSL 中运行(按照文档的规定):
它运行没有任何错误,并在控制台上打印出这些行:
但是当我在解释器中导入这个函数并在它上面使用send
方法时......
...它等待几秒钟,然后引发dramatiq.errors.ConnectionClosed
错误:
似乎此调用无法将消息发送到在 WSL 中运行的 Dramatiq。
注意:我还没有安装 RabbitMQ,但如果我要使用 Redis,我认为它不是必需的。
python - Dramatiq worker 和 socketio
我有一个 Flask Web 服务,它必须在后台执行长时间的异步操作,并且需要在完成时通知客户端。
为此,我使用dramatiq
运行异步任务并flask_socketio
使用 websockets 通知客户端。
我的代码如下所示:
第一个发射有效,第二个无效。考虑到第二次发射实际上是由工作人员在单独的进程中执行的,这不足为奇。
如何修改我的代码以使其正常工作?我想了想也试过了callbacks
,但他们也是actors
,所以有什么不同......还有其他想法吗?
python - Web服务用户之间任务执行时间的公平调度
假设我们有以下 Web 服务。主要功能是为给定的网站 URL 进行屏幕截图。有用于输入 URL 的 REST API 和用户界面。每个新 URL 都会在 Celery 中创建一个任务。对于前端 UI,某些 URL 的屏幕将在合理的时间(例如 10 秒)内跟随很重要。
现在,用户有意或由于软件错误输入了数百个 URL。这会使任务队列膨胀,其他用户必须等到所有这些任务都完成。
所以这里的要求是:
- 以某种公平的顺序运行任务。最简单的解决方案是一次为每个用户运行一项任务。比如:user1 任务、user2 任务、user1 任务、user2 任务等等。
- 在任务上有一些优先级。优先级 1 的类似任务总是在优先级 2 的任务之前完成。
目前,我们使用我们手工制作的模块。它将任务存储在 Redis 中,并以公平的顺序将它们推送到 Celery。为了不依赖 Celery 排序,它只推送与可用的空闲 Celery 工作人员一样多的任务,并且每 100 毫秒检查一次 Celery 队列中是否有空闲工作人员。
是否有符合我要求的图书馆或服务?
python-3.x - 如何从 Dramatiq python 中获取处理后的结果?
我正在检查芹菜的替代品,发现了 Dramatiq。我刚刚开始使用 Dramatiq,但无法检索结果。我什至尝试将后端和“save_results”设置为 True。我总是得到这个AttributeError: 'Message' object has no attribute 'get_results'
关于如何获得结果的任何想法?
django - DB 被 Django App 中的后台周期性任务压得喘不过气来
我们有一个 Django 应用程序,它必须定期使用第三方 API 来为一组用户获取大量数据。这些任务执行得很好并且完成了它们的目的。但是一段时间后,我们开始从 postgres 收到太多的连接错误
FATAL:抱歉,已经有太多客户了
信息
该项目是 dockerized 并且所有组件都在单独的容器中运行,包括应用程序和数据库(postgres)。周期性任务由 Dramatiq 执行并由 periodiq 调度。我们还使用 redis 作为系统代理。
我尝试了各种解决方法来使其停止,但它们都没有奏效,包括 SO 中提出的各种解决方案。
尝试 1
我在每次任务执行connection.closes()
之前和之后都使用过,以确保工作人员没有打开任何幽灵连接。
尝试 2
添加任务限制器以限制给定时间的活动连接数并防止数据库不堪重负。虽然这个解决方案显然没有服务于我们实现的实际范围,但它降低了任务执行的性能它对解决问题没有帮助。
尝试 3
增加 Postgres 的池限制。正如这里所建议的,我添加了一个自定义配置文件以增加可用池。这产生了效果,但它只是推迟了错误的显示,并没有避免这种情况按预期发生。我什至达到了 10K 连接的非常高的限制(默认为 10)。我在这里发布配置文件以防万一。
注意该应用程序在具有 24 个内核和 128GB RAM 的服务器上运行,并且在执行任务时使用的资源不超过 1%。
尝试 4
我已将pgpool插入到项目中,以便对数据库的请求进行排队。这可以防止数据库不堪重负,但它不是一个实用的解决方案,因为它导致数据库连接永远等待并且这使得数据库也可用。
尝试 5
使用CONN_MAX_AGE=0
参数来防止 Django 创建持久连接。那也没有任何影响。
尝试 6
尝试使任务在原子连接块上运行。这似乎也没有帮助。
我认为在 Dramatiq 工作程序上以并行线程执行任务的方式导致连接保持打开但空闲。我尝试从 Dramatiq 和 periodiq 容器手动关闭连接,但这在修复连接池问题方面几乎没有效果。
我尝试了在 SO 上找到的所有变体。