1

我尝试并行运行,但没有按预期工作

进度条不像我想象的那样工作。

我认为这两个操作应该同时执行。

但首先在 find_highest_protein_cereal 之后运行 find_highest_calorie_cereal

1

import csv
import time
import requests
from dagster import pipeline, solid


# start_complex_pipeline_marker_0
@solid
def download_cereals():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]


@solid
def find_highest_calorie_cereal(cereals):
    time.sleep(5)
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["calories"])
    )
    return sorted_cereals[-1]["name"]


@solid
def find_highest_protein_cereal(context, cereals):
    time.sleep(10)
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["protein"])
    )
    # for i in range(1, 11):
    #     context.log.info(str(i) + '~~~~~~~~')
    #     time.sleep(1)

    return sorted_cereals[-1]["name"]


@solid
def display_results(context, most_calories, most_protein):
    context.log.info(f"Most caloric cereal 테스트: {most_calories}")
    context.log.info(f"Most protein-rich cereal: {most_protein}")


@pipeline
def complex_pipeline():
    cereals = download_cereals()
    display_results(
        most_protein=find_highest_protein_cereal(cereals),
        most_calories=find_highest_calorie_cereal(cereals),
    )
4

1 回答 1

0

我不确定,但我认为您应该设置一个具有并行性的执行器。您可以使用 multiprocess_executor。

“执行者负责在管道运行中执行步骤。一旦运行启动并且运行的进程或运行工作者已分配和启动,执行者将承担执行责任。”

  1. 模式提供了可以使用的一组可能的执行器。使用 ModeDefinition 上的 executor_defs 属性。

MODE_DEV = ModeDefinition(name="dev", executor_defs=[multiprocess_executor])

@pipeline(mode_defs=[MODE_DEV], preset_defs=[Preset_test])

  1. 运行配置的执行配置部分决定了实际的执行者。

在 yml 文件或 run_config 中,设置:

execution:
  multiprocess:
    config:
      max_concurrent: 4

检索自:https ://docs.dagster.io/deployment/executors

于 2021-06-09T04:53:10.517 回答