问题标签 [dagster]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - Dagster 使用其输出从另一个管道启动管道
我应该如何在管道 A 完成后启动管道 B,并将管道 A 的输出用于管道 B?
一段代码作为起点:
这里我们有三个管道:pipeline1
有两个实体,可能做我们想做的任何事情,并从第二个实体返回输出。pipeline2
应该使用 的输出pipeline1_task2
,最终做另一件工作并打印第一个管道的输出。
我应该如何“连接”两条管道?
python - Dagster 在配置模式中传递有序字典
如何在 Dagster solid 的配置模式中传递 Ordered Dict?
简单的事情:
不起作用。在 Windows 10 上执行此操作始终如一地给出
在 Linux 上,它有时会切换它们,这对我不起作用。
如何传入collections.OrderedDict
配置模式,以便字典中的条目完全按照我在模式中传递它们的顺序排列?
pipeline - 我如何告诉 Dagit(Dagster GUI)在现有的 Dask 集群上运行?
我正在使用 dagster 0.11.3(撰写本文时的最新版本)
我创建了一个 Dagster 管道(保存为 pipeline.py),如下所示:
我将 DAGSTER_HOME 环境变量设置为包含名为 dagster.yaml 的文件的目录,该文件是一个空文件。这应该没问题,因为基于这些文档的默认值是合理的:https ://docs.dagster.io/deployment/dagster-instance 。
我有一个在“scheduler:8786”上运行的现有 Dask 集群。基于这些文档:https : //docs.dagster.io/deployment/custom-infra/dask,我创建了一个名为 config.yaml 的运行配置,如下所示:
我已经成功地将这个运行配置与 Dagster 一起使用,如下所示:
(我检查了 Dask 日志并确保它确实在我的 Dask 集群上运行)
我的问题是: 如何让 Dagit 使用这个 Dask 集群?我发现唯一似乎相关的是: https ://docs.dagster.io/_apidocs/execution#executors
...但它甚至没有提到 Dask 作为一个选项(它有 dagster.in_process_executor 和 dagster.multiprocess_executor,它们似乎与 dask 一点关系都没有)。
可能我需要配置 dagster-dask,此处记录:https ://docs.dagster.io/_apidocs/libraries/dagster-dask#dask-dagster-dask
...但是在使用 Dagit 时我应该把运行配置放在哪里?例如,无法将 config.yaml 提供给 Dagit。
dagster - 如何在运行时将标签/调试信息添加到可靠的上下文中,可以从 failure_hook 中检索?
首先,让我给出我的用例。
我有一个实体,可以运行并尝试使用特定的输入和配置。我还有一个失败的钩子,可以记录到松弛。
在实体步骤的执行过程中,我想在上下文中添加一些信息(可能是选择的输入变量值等),可能是以字典的形式。然后我希望 failure_hook 能够为它执行的任何错误记录检索这些。
实现此模式的最佳方法是什么?
python - Dagster 包装函数的文档字符串未出现在 Sphinx 中
我有一个使用 pypspark 和现在 dagster 用 python 编写的项目。我们使用 Sphinx 来构建文档,使用 napoleon 来解析 Google 风格的文档字符串。我们已经开始包含如下预包装的 dagster 实体:
当我们使用 sphinx-apidoc 构建时,我可以通过检查看到该函数的文档字符串存在,join_two_dfs_solid.__doc__
并且 dagster 附加join_two_dfs_solid._description
字段为空,这应该意味着它使用了文档字符串。但是,当 sphinx 文档构建时,我得到一个空白的 .rst 文件,用于包含该实体的模块。有谁知道狮身人面像中是否有任何其他配置设置或我需要更改以使其正确构建的实体?
dagster - 如何定义具有多个参数的复合实体,包括“Nothing”?
我有一个固体需要在 2 个固体之后运行。一个将返回一个值,另一个不返回任何内容,但具有依赖实体并且需要时间来运行。
我在multiprocessing
mode中执行管道,如果实体没有定义依赖项,它们会同时运行。
以下是我正在寻找的示例情况。假设我有以下固体。
执行时,这些实体将在下面的时间线中运行。
时间飞逝 | 坚硬的 |
---|---|
0 | 管道开始... |
1秒 | solid_b 开始 |
3 秒 | solid_a 依赖实体正在运行。solid_a 还没有开始。 |
5 秒 | solid_b 完成的 |
10 秒 | solid_a 现在开始 |
15 秒 | solid_a 完成的 |
20 秒 | my_composite_solid 应该现在开始。 |
所以,根据这个时间表,为了my_composite_solid
开始,我需要两者solid_a
并solid_b
完成执行。但是,当我这样做时,dagster 会抛出一个错误:
如果我不将solid_a
输出作为依赖my_composite_solid
项,它将在solid_b
. 我应该怎么办?
python - Dagster 在从实体调用的标准 python 函数中使用 dagster 的自定义记录器
我可以配置一个自定义记录器(例如,一个文件记录器),我可以在一个实体中成功使用它context.log.info
(例如)。如何在标准 Python 函数/类中使用相同的记录器?
我正在使用标准colored_console_logger
,以便我可以直接在控制台中看到正在发生的事情。这个想法是将它(或与它一起使用)与另一个(自定义)记录器交换。
可重现的例子:test_logging.py
随机函数.py