问题标签 [dagster]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
dagster - 如何避免在某些条件下运行 dagster 管道的其余部分
假设我在管道上连接了 Dagster 中的两个固体。第一个实体可能会执行一些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为了达到这个结果,我在数据满足无效条件时引发错误,因此管道停止并跳过其余的实体。
引发错误以解决我的用例似乎很棘手,有没有一种方法可以跳过管道其余部分的执行而不诉诸异常?
在这个非常人为的示例中,实体 2 仅应在第一个实体的输出为奇数时执行。
在我的实际用例中,第一个实体轮询数据库,有时找不到要处理的数据。在这种情况下,不要将执行标记为失败,而是标记为成功。可以在每个下游实体中检查数据是否满足条件,但这很快就会增加样板。当接收数据的实体找不到要处理的数据时,最好有一种方法跳过所有下游实体的执行。
dagster - dagster 主进程的工作目录是否与调度程序进程不同
我在从 dagster 代码(设置,而不是管道)加载文件时遇到问题。假设我有以下项目结构:
当我在项目文件夹($cd project && dagit -y app/repository.yaml
)中运行 dagit 时,该文件夹成为工作目录,并且在其中repository.py
我可以加载一个知道根目录的文件project
但是,如果我设置了计划,则项目中的管道不会运行。检查 cron 日志似乎该open
行引发了一个未找到文件的异常。我想知道是否会发生这种情况,因为执行 cron 时工作目录不同。
对于上下文,我正在为每个管道加载一个带有 cron_schedules 参数的配置文件。另外,在我的例子中,这是堆栈跟踪的尾部:
sentry - Dagster 故障通知系统
dagster 有没有办法在某些事件发生时接收通知,例如失败?例如,是否有与哨兵等工具的集成?
dagster - Dagster:多个和条件输出(步骤输出 xxx PySparkDataFrame 的类型检查失败)
我正在执行 Dagster 教程,但我卡在了Multiple and Conditional Outputs步骤。
在solid
定义中,它要求声明(除其他外):
但没有信息DataFrame
来自哪里。首先我尝试过,pandas.DataFrame
但我遇到了错误:{dagster_type} is not a valid dagster type
. 当我尝试通过$ dagit -f multiple_outputs.py
. 然后我安装dagster_pyspark
并尝试了dagster_pyspark.DataFrame
. 这次我设法将 DAG 推到了 UI 上。但是,当我从 UI 运行它时,出现以下错误:
有谁知道如何修理它?谢谢您的帮助!
graphql - 在 Dagster 的 GraphQL API 中运行 ExecutePipeline 时,$repositoryLocationName 的值是多少?
我正在尝试启动使用 GraphQL API 运行的 Dagster 管道。我有在本地运行的 Dagit 和一个可以通过操场触发的工作管道。
但是,我现在正尝试通过 GraphQL Playground 触发管道,可在/graphql
.
我正在使用以下突变:
...因此我提供以下查询参数:
我不确定repositoryLocationName
应该取什么值?我尝试了一些但收到以下错误:
这是我正在关注的教程。
python - 您如何确保同一管道不会同时执行两次
嘿:) 我对锁定或互斥行为有疑问。
场景:
让我们假设以下场景:
- 管道正在处理一些本地文件。这些文件由 CI-CD 作业放置。处理后我想删除文件。如果作业花费的时间比计划间隔长,这将导致竞争条件
- 两条管道非常耗费资源,因此不能并行运行。
可能的解决方案
- 目前我会在正在运行的服务中使用某种互斥锁或锁,其中管道可以注册并允许执行或不执行。
- 复制数据以确保每个工作流都可以清理和使用自己的数据。
- 创建一个本地锁定文件,并确保如果成功,该文件将被删除。
- 创建一个较小的计划间隔并检查是否存在锁定。如果条件不满足,则干净地退出。
我知道这可能不是 dagster 的正常用例,但我也想将 dagster 用于其他工作流程,例如清理任务和触发其他管道。
谢谢
dagster - 测试 dagster 管道
摘要:Dagit 与 PyTest 的 Dagster 运行配置似乎与我的项目不兼容
我一直在尝试在管道上运行 pytest 时遇到错误,我非常感谢任何指针。我一直收到以下形式的错误:
关于项目的几点说明:
- 匕首,版本 0.9.15
- 我的管道在 Dagit 中运行,对于相同的配置没有错误
- 单元测试针对构成管道的各个实体运行
失败的解决方案:我尝试使用实体填充配置文件,这些实体定义了每个 pytest 错误所建议的输出,但它们都导致错误比之前的错误更不透明。
我的固体是:
我的管道是这些固体的链
我在 test_main.py 中的测试代码是
其中预设的“测试”是使用 yaml 文件中的运行配置定义的:
^ 这是它抛出最多错误的地方,我一直在迭代不同的固体排列以添加 ala:
但它还没有解决问题。尽管在 Dagit 中运行时,只有输入实体需要定义,但是否有任何理由需要定义用于测试的实体的输出?
这个错误是否表明有其他问题?
编辑:这是来自 tox --verbose 的堆栈跟踪
下面 的解决方案有效 关键问题是管道需要在配置中定义实体,并且实体在其测试函数中通过相同的配置和 input_values。我的更改是删除“input_values”作为参数并通过运行配置传递它们。由于我的间隙实体需要更复杂的对象并且我的配置文件是 yaml,因此我在所有实体测试中添加了以下内容:
python - Dagster:如何重新执行管道的失败步骤?
我创建了一个测试管道,但它中途失败了。我想以编程方式重新执行它,但从管道的失败步骤开始并继续前进。我不想重复执行之前的成功步骤。
python - 如何在复合实体中使用从其他实体产生的字典?
例如,我有一个名为initial_load的实体,它产生一个字典和一个整数,例如:
我也有一个composite_solid,比如说 call_other_solid_composite
我正在传递l_dict
和传递l_int
给它composite_solid
,我正在使用l_dict
来获取映射到其键的值。就像是。
然后我得到一个错误:TypeError: 'InputMappingNode' object is not subscriptable
。我到处搜索,但找不到解决方案。该文档也没有帮助。我有需要解析这些值的用例。任何帮助将不胜感激。