问题标签 [dagster]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1609 浏览

dagster - 如何避免在某些条件下运行 dagster 管道的其余部分

假设我在管道上连接了 Dagster 中的两个固体。第一个实体可能会执行一些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为了达到这个结果,我在数据满足无效条件时引发错误,因此管道停止并跳过其余的实体。

引发错误以解决我的用例似乎很棘手,有没有一种方法可以跳过管道其余部分的执行而不诉诸异常?

在这个非常人为的示例中,实体 2 仅应在第一个实体的输出为奇数时执行。

在我的实际用例中,第一个实体轮询数据库,有时找不到要处理的数据。在这种情况下,不要将执行标记为失败,而是标记为成功。可以在每个下游实体中检查数据是否满足条件,但这很快就会增加样板。当接收数据的实体找不到要处理的数据时,最好有一种方法跳过所有下游实体的执行。

0 投票
1 回答
767 浏览

dagster - dagster 主进程的工作目录是否与调度程序进程不同

我在从 dagster 代码(设置,而不是管道)加载文件时遇到问题。假设我有以下项目结构:

当我在项目文件夹($cd project && dagit -y app/repository.yaml)中运行 dagit 时,该文件夹成为工作目录,并且在其中repository.py我可以加载一个知道根目录的文件project

但是,如果我设置了计划,则项目中的管道不会运行。检查 cron 日志似乎该open行引发了一个未找到文件的异常。我想知道是否会发生这种情况,因为执行 cron 时工作目录不同。

对于上下文,我正在为每个管道加载一个带有 cron_schedules 参数的配置文件。另外,在我的例子中,这是堆栈跟踪的尾部:

0 投票
1 回答
125 浏览

config - 实体配置中字段的不可设置类型不允许空值

我希望 Dagster 接受 config.yaml 中的空参数并将它们视为具有无值。

当我启动 dagit 时,我可以看到参数为空。这是有道理的,因为我在 config.yaml 中将参数的值留空。 Dagit 将参数显示为 null

但是,当我执行管道时,出现以下错误: 错误表明它需要一个字符串类型

当我在可靠配置中指定 Noneable(str) 类型时,我不确定为什么它期望类 str 的值。有趣的是,当我在 dagit 中为example_parameter传递一个非空值时,管道执行得非常好。

下面是我用于实体的配置

如何让 dagster 接受空值并将它们解析为 None?

0 投票
2 回答
558 浏览

sentry - Dagster 故障通知系统

dagster 有没有办法在某些事件发生时接收通知,例如失败?例如,是否有与哨兵等工具的集成?

0 投票
3 回答
940 浏览

dagster - Dagster:多个和条件输出(步骤输出 xxx PySparkDataFrame 的类型检查失败)

我正在执行 Dagster 教程,但我卡在了Multiple and Conditional Outputs步骤。

solid定义中,它要求声明(除其他外):

但没有信息DataFrame来自哪里。首先我尝试过,pandas.DataFrame但我遇到了错误:{dagster_type} is not a valid dagster type. 当我尝试通过$ dagit -f multiple_outputs.py. 然后我安装dagster_pyspark并尝试了dagster_pyspark.DataFrame. 这次我设法将 DAG 推到了 UI 上。但是,当我从 UI 运行它时,出现以下错误:

有谁知道如何修理它?谢谢您的帮助!

0 投票
2 回答
147 浏览

graphql - 在 Dagster 的 GraphQL API 中运行 ExecutePipeline 时,$repositoryLocationName 的值是多少?

我正在尝试启动使用 GraphQL API 运行的 Dagster 管道。我有在本地运行的 Dagit 和一个可以通过操场触发的工作管道。

但是,我现在正尝试通过 GraphQL Playground 触发管道,可在/graphql.

我正在使用以下突变:

...因此我提供以下查询参数:

我不确定repositoryLocationName应该取什么值?我尝试了一些但收到以下错误:

是我正在关注的教程。

0 投票
3 回答
579 浏览

python - 您如何确保同一管道不会同时执行两次

嘿:) 我对锁定或互斥行为有疑问。

场景

让我们假设以下场景:

  1. 管道正在处理一些本地文件。这些文件由 CI-CD 作业放置。处理后我想删除文件。如果作业花费的时间比计划间隔长,这将导致竞争条件
  2. 两条管道非常耗费资源,因此不能并行运行。

可能的解决方案

  • 目前我会在正在运行的服务中使用某种互斥锁或锁,其中管道可以注册并允许执行或不执行。
  • 复制数据以确保每个工作流都可以清理和使用自己的数据。
  • 创建一个本地锁定文件,并确保如果成功,该文件将被删除。
  • 创建一个较小的计划间隔并检查是否存在锁定。如果条件不满足,则干净地退出。

我知道这可能不是 dagster 的正常用例,但我也想将 dagster 用于其他工作流程,例如清理任务和触发其他管道。

谢谢

0 投票
1 回答
730 浏览

dagster - 测试 dagster 管道

摘要:Dagit 与 PyTest 的 Dagster 运行配置似乎与我的项目不兼容

我一直在尝试在管道上运行 pytest 时遇到错误,我非常感谢任何指针。我一直收到以下形式的错误:

关于项目的几点说明:

  • 匕首,版本 0.9.15
  • 我的管道在 Dagit 中运行,对于相同的配置没有错误
  • 单元测试针对构成管道的各个实体运行

失败的解决方案:我尝试使用实体填充配置文件,这些实体定义了每个 pytest 错误所建议的输出,但它们都导致错误比之前的错误更不透明。

我的固体是:

我的管道是这些固体的链

我在 test_main.py 中的测试代码是

其中预设的“测试”是使用 yaml 文件中的运行配置定义的:

^ 这是它抛出最多错误的地方,我一直在迭代不同的固体排列以添加 ala:

但它还没有解决问题。尽管在 Dagit 中运行时,只有输入实体需要定义,但是否有任何理由需要定义用于测试的实体的输出?

这个错误是否表明有其他问题?

编辑:这是来自 tox --verbose 的堆栈跟踪

下面 的解决方案有效 关键问题是管道需要在配置中定义实体,并且实体在其测试函数中通过相同的配置和 input_values。我的更改是删除“input_values”作为参数并通过运行配置传递它们。由于我的间隙实体需要更复杂的对象并且我的配置文件是 yaml,因此我在所有实体测试中添加了以下内容:

0 投票
1 回答
289 浏览

python - Dagster:如何重新执行管道的失败步骤?

我创建了一个测试管道,但它中途失败了。我想以编程方式重新执行它,但从管道的失败步骤开始并继续前进。我不想重复执行之前的成功步骤。

0 投票
1 回答
267 浏览

python - 如何在复合实体中使用从其他实体产生的字典?

例如,我有一个名为initial_load的实体,它产生一个字典和一个整数,例如:

我也有一个composite_solid,比如说 call_other_solid_composite

我正在传递l_dict和传递l_int 给它composite_solid ,我正在使用l_dict来获取映射到其键的值。就像是。

然后我得到一个错误:TypeError: 'InputMappingNode' object is not subscriptable。我到处搜索,但找不到解决方案。该文档也没有帮助。我有需要解析这些值的用例。任何帮助将不胜感激。