我尝试并行运行,但没有按预期工作
进度条不像我想象的那样工作。
我认为这两个操作应该同时执行。
但首先在 find_highest_protein_cereal 之后运行 find_highest_calorie_cereal
import csv
import time
import requests
from dagster import pipeline, solid
# start_complex_pipeline_marker_0
@solid
def download_cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@solid
def find_highest_calorie_cereal(cereals):
time.sleep(5)
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal["calories"])
)
return sorted_cereals[-1]["name"]
@solid
def find_highest_protein_cereal(context, cereals):
time.sleep(10)
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal["protein"])
)
# for i in range(1, 11):
# context.log.info(str(i) + '~~~~~~~~')
# time.sleep(1)
return sorted_cereals[-1]["name"]
@solid
def display_results(context, most_calories, most_protein):
context.log.info(f"Most caloric cereal 테스트: {most_calories}")
context.log.info(f"Most protein-rich cereal: {most_protein}")
@pipeline
def complex_pipeline():
cereals = download_cereals()
display_results(
most_protein=find_highest_protein_cereal(cereals),
most_calories=find_highest_calorie_cereal(cereals),
)