-1

我正在执行 Dagster 教程,但我卡在了Multiple and Conditional Outputs步骤。

solid定义中,它要求声明(除其他外):

output_defs=[
    OutputDefinition(
        name="hot_cereals", dagster_type=DataFrame, is_required=False
    ),
    OutputDefinition(
        name="cold_cereals", dagster_type=DataFrame, is_required=False
    ),
],

但没有信息DataFrame来自哪里。首先我尝试过,pandas.DataFrame但我遇到了错误:{dagster_type} is not a valid dagster type. 当我尝试通过$ dagit -f multiple_outputs.py. 然后我安装dagster_pyspark并尝试了dagster_pyspark.DataFrame. 这次我设法将 DAG 推到了 UI 上。但是,当我从 UI 运行它时,出现以下错误:

dagster.core.errors.DagsterTypeCheckDidNotPass: Type check failed for step output hot_cereals of type PySparkDataFrame.
  File "/Users/bambrozio/.local/share/virtualenvs/dagster-tutorial/lib/python3.7/site-packages/dagster/core/execution/plan/execute_plan.py", line 210, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/bambrozio/.local/share/virtualenvs/dagster-tutorial/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 273, in core_dagster_event_sequence_for_step
    for evt in _create_step_events_for_output(step_context, user_event):
  File "/Users/bambrozio/.local/share/virtualenvs/dagster-tutorial/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 298, in _create_step_events_for_output
    for output_event in _type_checked_step_output_event_sequence(step_context, output):
  File "/Users/bambrozio/.local/share/virtualenvs/dagster-tutorial/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 221, in _type_checked_step_output_event_sequence
    dagster_type=step_output.dagster_type,

有谁知道如何修理它?谢谢您的帮助!

4

3 回答 3

0

正如 Arthur 所指出的,完整的教程代码可在 Dagster 的github上找到。

但是,您不需要dagster_pandas,而是代码中缺少的关键行是:

if typing.TYPE_CHECKING:
    DataFrame = list
else:
    DataFrame = PythonObjectDagsterType(list, name="DataFrame")  # type: Any

上述结构的原因是为了实现 MyPy 合规性,请参阅教程的类型和期望部分

另请参阅有关Dagster 类型的文档。

于 2020-09-03T07:34:53.800 回答
0

I was stuck here, too, but luckily I found the updated source code. They have updated the docs so that the OutputDefinition is defined beforehand.

Update your code before sorting and pipeline like below:

import csv
import os

from dagster import (
    Bool,
    Field,
    Output,
    OutputDefinition,
    execute_pipeline,
    pipeline,
    solid,
)


@solid
def read_csv(context, csv_path):
    lines = []
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        for row in csv.DictReader(fd):
            row["calories"] = int(row["calories"])
            lines.append(row)

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines


@solid(
    config_schema={
        "process_hot": Field(Bool, is_required=False, default_value=True),
        "process_cold": Field(Bool, is_required=False, default_value=True),
    },
    output_defs=[
        OutputDefinition(name="hot_cereals", is_required=False),
        OutputDefinition(name="cold_cereals", is_required=False),
    ],
)
def split_cereals(context, cereals):
    if context.solid_config["process_hot"]:
        hot_cereals = [cereal for cereal in cereals if cereal["type"] == "H"]
        yield Output(hot_cereals, "hot_cereals")
    if context.solid_config["process_cold"]:
        cold_cereals = [cereal for cereal in cereals if cereal["type"] == "C"]
        yield Output(cold_cereals, "cold_cereals")

You can also find the whole lines of codes from here.

于 2020-09-30T04:13:03.067 回答
-1

首先尝试安装 dagster pandas 集成:

pip install dagster_pandas

然后做:

from dagster_pandas import DataFrame

您可以在此处找到教程中的代码。

于 2020-09-02T12:55:59.450 回答