0

嘿:) 我对锁定或互斥行为有疑问。

场景

让我们假设以下场景:

  1. 管道正在处理一些本地文件。这些文件由 CI-CD 作业放置。处理后我想删除文件。如果作业花费的时间比计划间隔长,这将导致竞争条件
  2. 两条管道非常耗费资源,因此不能并行运行。

可能的解决方案

  • 目前我会在正在运行的服务中使用某种互斥锁或锁,其中管道可以注册并允许执行或不执行。
  • 复制数据以确保每个工作流都可以清理和使用自己的数据。
  • 创建一个本地锁定文件,并确保如果成功,该文件将被删除。
  • 创建一个较小的计划间隔并检查是否存在锁定。如果条件不满足,则干净地退出。

我知道这可能不是 dagster 的正常用例,但我也想将 dagster 用于其他工作流程,例如清理任务和触发其他管道。

谢谢

4

3 回答 3

1

我不熟悉 dagster,但我在其他环境中成功使用的一种机制是利用在类 Unix 系统中 rename 或 mv 是原子操作的事实。对于运行后清理的第一个要求:

  1. 新文件被放入输入目录。一组输入文件可以隔离在它们自己的目录中。

  2. 当管道进程启动时,它的第一个操作是从输入目录中选择一个文件(或目录)并将其 mv 到管道实例拥有的工作目录中。如果输入目录中没有可用的文件,则该进程会优雅地自行关闭。

  3. 如果 mv 成功,则该进程继续对刚刚移动到其工作目录的文件(目录)执行其操作。完成后,它会自行清理,可能通过对其工作目录执行递归删除。

  4. 如果 mv 失败,则意味着另一个进程从该文件下抓取了新文件。失败的过程会优雅地关闭自己。

对于一次只运行一个管道进程的第二个要求,您可以使用独占创建哨兵文件,如果未成功创建哨兵文件,则进程失败并退出。在 python 3 中,代码可能看起来像

try:
    open('sentinel', 'x').close()
except FileExistsError:
    exit("someone else already has sentinel")

do_stuff()

os.remove('sentinel')

当然,如果你的进程在 do_stuff() 中的某个地方崩溃,你必须手动清理哨兵文件,或者你可以使用 atexit 处理程序来确保即使在 do_stuff() 崩溃的情况下也能删除哨兵。 )。

于 2020-11-17T15:22:02.763 回答
1

感谢您分享您的用例。我不认为 Dagster 目前原生支持这些功能。然而,0.10.0 版本(几个月后)将包括运行级队列,允许您对并发管道运行设置限制。目前它只支持对运行的全局限制,但很快将支持添加基于管道标签的规则(例如,标记为“资源重”的管道可以限制为 3 个并发运行)。看起来这可能适合这个用例?

预览当前排队系统的指南在这里。也请随时通过@johann 的 Dagster 闲置与我联系!

于 2020-11-17T17:40:56.930 回答
0

对于场景 #2(处理资源非常繁重且无法并行运行的管道)的建议是使用 dagster 的Celery集成,例如celery_executorcelery_docker_executorcelery_k8s_job_executor(如果你在 kubernetes 上)。

这些工作的方式是 Dagster 管道运行协调器会将每个可靠的执行任务添加到 Celery 队列中,并且 Celery 允许您限制每个队列中活动任务的数量。例如,这通常用于确保在给定时间只有 X 个实体连接到 Redshift。

Dagster 还支持使用多个队列,因此您可以为资源密集型实体创建一个队列,为非资源密集型实体创建另一个队列(具有更高的并发限制)。

关于场景#1,我不确定您的设计限制是什么。一个想法是使用管道运行标记的标记方案来跟踪哪个管道运行对应于哪个文件;然后对于每个文件,执行文件清理的进程首先在删除之前验证是否存在成功的管道运行(通过查询运行数据库)。

于 2020-11-18T01:26:29.487 回答