问题标签 [dagster]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
765 浏览

python - Dagster 使用其输出从另一个管道启动管道

我应该如何在管道 A 完成后启动管道 B,并将管道 A 的输出用于管道 B?

一段代码作为起点:

这里我们有三个管道:pipeline1有两个实体,可能做我们想做的任何事情,并从第二个实体返回输出。pipeline2应该使用 的输出pipeline1_task2,最终做另一件工作并打印第一个管道的输出。

我应该如何“连接”两条管道?

0 投票
1 回答
149 浏览

python - Dagster 在配置模式中传递有序字典

如何在 Dagster solid 的配置模式中传递 Ordered Dict?

简单的事情:

不起作用。在 Windows 10 上执行此操作始终如一地给出

在 Linux 上,它有时会切换它们,这对我不起作用。

如何传入collections.OrderedDict配置模式,以便字典中的条目完全按照我在模式中传递它们的顺序排列?

0 投票
1 回答
653 浏览

pipeline - 我如何告诉 Dagit(Dagster GUI)在现有的 Dask 集群上运行?

我正在使用 dagster 0.11.3(撰写本文时的最新版本)

我创建了一个 Dagster 管道(保存为 pipeline.py),如下所示:

我将 DAGSTER_HOME 环境变量设置为包含名为 dagster.yaml 的文件的目录,该文件是一个空文件。这应该没问题,因为基于这些文档的默认值是合理的:https ://docs.dagster.io/deployment/dagster-instance 。

我有一个在“scheduler:8786”上运行的现有 Dask 集群。基于这些文档:https : //docs.dagster.io/deployment/custom-infra/dask,我创建了一个名为 config.yaml 的运行配置,如下所示:

我已经成功地将这个运行配置与 Dagster 一起使用,如下所示:

(我检查了 Dask 日志并确保它确实在我的 Dask 集群上运行)

我的问题是: 如何让 Dagit 使用这个 Dask 集群?我发现唯一似乎相关的是: https ://docs.dagster.io/_apidocs/execution#executors

...但它甚至没有提到 Dask 作为一个选项(它有 dagster.in_process_executor 和 dagster.multiprocess_executor,它们似乎与 dask 一点关系都没有)。

可能我需要配置 dagster-dask,此处记录:https ://docs.dagster.io/_apidocs/libraries/dagster-dask#dask-dagster-dask

...但是在使用 Dagit 时我应该把运行配置放在哪里?例如,无法将 config.yaml 提供给 Dagit。

0 投票
1 回答
37 浏览

dagster - 如何在运行时将标签/调试信息添加到可靠的上下文中,可以从 failure_hook 中检索?

首先,让我给出我的用例。

我有一个实体,可以运行并尝试使用特定的输入和配置。我还有一个失败的钩子,可以记录到松弛。

在实体步骤的执行过程中,我想在上下文中添加一些信息(可能是选择的输入变量值等),可能是以字典的形式。然后我希望 failure_hook 能够为它执行的任何错误记录检索这些。

实现此模式的最佳方法是什么?

0 投票
1 回答
731 浏览

dagster - Dagster 配置 YAML 中的环境变量

我正在尝试在配置 YAML 文件中提供一个环境变量,如下所示:

但我收到以下错误:

我已经在这个官方示例中看到了这种语法。我错过了一些明显的东西吗?

0 投票
1 回答
94 浏览

python - Dagster 包装函数的文档字符串未出现在 Sphinx 中

我有一个使用 pypspark 和现在 dagster 用 python 编写的项目。我们使用 Sphinx 来构建文档,使用 napoleon 来解析 Google 风格的文档字符串。我们已经开始包含如下预包装的 dagster 实体:

当我们使用 sphinx-apidoc 构建时,我可以通过检查看到该函数的文档字符串存在,join_two_dfs_solid.__doc__并且 dagster 附加join_two_dfs_solid._description字段为空,这应该意味着它使用了文档字符串。但是,当 sphinx 文档构建时,我得到一个空白的 .rst 文件,用于包含该实体的模块。有谁知道狮身人面像中是否有任何其他配置设置或我需要更改以使其正确构建的实体?

0 投票
1 回答
124 浏览

dagster - 找不到用于 celery-k8s 部署的 AWS 凭证

我正在尝试使用 celery-k8s 运行 dagster 并使用示例/celery-k8s 作为开始。从操场上运行管道后,我得到

如文档中所述,我已在 env 变量中配置 aws 凭据

而且我还可以看到这些值是在 pod 的 env 变量中设置的,并且还可以在 pip install awscli 和 aws s3 ls 之后访问 s3 位置,但请查看作业 pod 下方的屏幕截图但是会抛出Unable to locate credentials

截屏 请帮忙

0 投票
0 回答
157 浏览

dagster - 如何定义具有多个参数的复合实体,包括“Nothing”?

我有一个固体需要在 2 个固体之后运行。一个将返回一个值,另一个不返回任何内容,但具有依赖实体并且需要时间来运行。

我在multiprocessingmode中执行管道,如果实体没有定义依赖项,它们会同时运行。

以下是我正在寻找的示例情况。假设我有以下固体。

执行时,这些实体将在下面的时间线中运行。

时间飞逝 坚硬的
0 管道开始...
1秒 solid_b开始
3 秒 solid_a依赖实体正在运行。solid_a还没有开始。
5 秒 solid_b完成的
10 秒 solid_a现在开始
15 秒 solid_a完成的
20 秒 my_composite_solid 应该现在开始

所以,根据这个时间表,为了my_composite_solid开始,我需要两者solid_asolid_b完成执行。但是,当我这样做时,dagster 会抛出一个错误:

如果我不将solid_a输出作为依赖my_composite_solid项,它将在solid_b. 我应该怎么办?

0 投票
1 回答
244 浏览

python - Dagster 在从实体调用的标准 python 函数中使用 dagster 的自定义记录器

我可以配置一个自定义记录器(例如,一个文件记录器),我可以在一个实体中成功使用它context.log.info(例如)。如何在标准 Python 函数/类中使用相同的记录器?

我正在使用标准colored_console_logger,以便我可以直接在控制台中看到正在发生的事情。这个想法是将它(或与它一起使用)与另一个(自定义)记录器交换。

可重现的例子:test_logging.py

随机函数.py

0 投票
1 回答
512 浏览

dagster - dagster solid 并行运行测试示例

我尝试并行运行,但没有按预期工作

进度条不像我想象的那样工作。

我认为这两个操作应该同时执行。

但首先在 find_highest_protein_cereal 之后运行 find_highest_calorie_cereal

1