问题标签 [dagster]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
304 浏览

python - 如何在 Dagster 中使用计划创建分区?

我正在尝试在 Dagster 中创建允许我进行回填的分区。该文档有一个示例,但它使用星期几(我能够复制)。但是,我正在尝试创建带有日期的分区。

当前的 dagster UI 将所有日期集中在分区部分。 实际结果

预期成绩

我错过了什么会给我预期的结果?

0 投票
1 回答
143 浏览

python - 使用 Dagster 收集元数据

每个人

我已经开始使用 dagster 大约一个星期了,我对这个工具很着迷。但是,我想知道是否可以收集 dagster 在输出中生成的元数据。

常规的 dagster 输出如下所示:

2021-06-17 15:12:30 - dagster - 调试 - my_pipeline- 47989433-702c-4246-9c8d-ab4c8bab4be6 - 13936 - merge_transformations - LOADED_INPUT - 使用输入管理器“io_manager”从输出“结果”加载输入“clean_daag_df”步骤“clean_dzag”

[...]

2021-06-17 15:12:30 - dagster - 调试 - my_pipeline - 47989433-702c-4246-9c8d-ab4c8bab4be6 - 13936 - merge_transformations - STEP_SUCCESS - 在 98 毫秒内完成了步骤“merge_transformations”的执行。

我想知道如何访问这些信息,特别是每个实体的开始和完成时间以及管道运行 ID,如果可能的话,每个实体执行的 ID。(而不仅仅是在屏幕上看到输出,我想将其导出到文件或数据库中)。

提前感谢您的帮助。

0 投票
1 回答
113 浏览

dagster - 我可以在 map() 之后为多个实体使用一个 DynamicResource 的输出吗?

我正在做类似于动态映射的事情,并在文档中收集示例。该示例列出目录中的文件,将每个文件映射到计算文件大小的实体,然后收集输出以汇总总体大小。

但是,我想在每个实体上并行运行多个实体。所以继续这个例子:我将列出目录中的文件;然后映射,以便我为每个文件计算大小,检查文件权限,并并行计算 md5sum;最后收集输出。

我可以在每个文件上按顺序运行它们,例如:

但是,如果这些实际上不是串行依赖项,那么并行处理每个文件的工作会很好。有没有这样的语法:

0 投票
1 回答
164 浏览

amazon-web-services - 运行 Dagster 默认模式时,如何修复错误“在路径 root:postgres_db 处收到意外的配置条目“params”?

我已经按照 Dagster 指南使用 Helm https://docs.dagster.io/deployment/guides/kubernetes/deploying-with-helm在 AWS EKS 上部署了 Dagster 。

当我尝试按照上述指南运行默认模式时,我在作业日志中收到以下错误消息:

任何克服此错误消息的提示将不胜感激。

0 投票
0 回答
243 浏览

python - Dagster 运行多个项目的管道

我需要一些帮助来使用 dagster 项目在 AWS 中进行部署,遗憾的是在官方文档中找不到。

因此,使用 repo.py 的简单实体和管道的一些上下文是完美的文件。但是当我在新目录项目中更改实体和管道的结构时,问题开始出现在 aws 中。所以目标不是使用 repo.py 来触发管道(这里是显示我也在裁判的例子)。docker-compose.yaml 文件的第 65 行使用了这个命令

dagster api grpc -h 0.0.0.0 -p 4000 -f repo.py

同样的命令会发送到我们的 AWS 基础设施以触发管道。相反,我正在寻找的是利用 workspace.yaml 文件(我可以在其中添加多个 python 包)。

那么有人认为可以像这样使用命令吗?(目前 dagster api 没有现有的 '-w' 参数)

dagster api grpc -h 0.0.0.0 -p 4000 -w workspace.yaml

如果不是,那么另一个想法是在主目录中使用模块而不是“repo.py”。Dagit 与模块配合得非常好

dagit -m project-01

但这可以用匕首来实现吗?所以命令会变成这样 dagster api grpc -h 0.0.0.0 -p 4000 -m project-01(目前它会抛出一个错误,即 project-01 不存在)

0 投票
0 回答
50 浏览

python - Dagster 对象在传递给函数时具有价值,但不在其中

NonKeySetincompositekey_pipeline()有一个值,在调用之前get_unique_df()

但是,NonKeySetinget_unique_df()没有任何价值。

这个问题与Dagster或其他有关Python吗?也许问题是在调用之后巧合发生的,而不是由Dagster.

注意:NonKeySet打印两次。指示该函数被调用两次(不知道为什么/如何)。

0 投票
2 回答
80 浏览

python-3.x - 自动启动 Dagster 计划

嗨,我正在学习 dagster,我需要有关启动计划的帮助我可以在 dagit 中添加和启动计划,但我想自动启动计划,而不是从 dagit 打开每个计划。

#这是我的代码

0 投票
1 回答
202 浏览

python - 向实体函数添加附加参数

我想在调用实体时添加其他参数,该参数继承自另一个实体,如下所示:

但是从 dagster API grpc 运行管道时,出现以下错误:

怎么修?

0 投票
0 回答
14 浏览

dagster - 重新执行失败的管道时,是否可以临时更改参数(即可靠配置)?

在重新执行失败的管道/实体之前能够编辑/重新加载导致实体配置的配置将很有用。

0 投票
0 回答
43 浏览

python-3.x - dagster - 错误 - train_pipeline | S3 URI AWS

s3_uri 没问题 MODELS.keys(),因为它不sys.exit()存在:

但是,该estimator.fit(s3_data, wait, logs=True)函数不适合s3_data,其中s3_data = s3_uri

我已经将这两个配置文件都包含在了它们的整体中:train_pipeline.yamlmodels.yaml.



Python脚本:

追溯:

train_pipeline.yaml

models.yaml

请让我知道是否还有其他我应该添加的内容。