1

我正在尝试在 Dagster 中创建允许我进行回填的分区。该文档有一个示例,但它使用星期几(我能够复制)。但是,我正在尝试创建带有日期的分区。

DATE_FORMAT = "%Y-%m-%d"
BACKFILL_DATE = "2021-04-01"
TODAY = datetime.today()


def get_number_of_days():
    backfill_date_obj = datetime.strptime(BACKFILL_DATE, DATE_FORMAT)
    delta = TODAY - backfill_date_obj

    return delta


def get_date_partitions():
    return [
        Partition(
            [
                datetime.strftime(TODAY - timedelta(days=x), DATE_FORMAT)
                for x in range(get_number_of_days().days)
            ]
        )
    ]


def run_config_for_date_partition(partition):
    date = partition.value
    return {"solids": {"data_to_process": {"config": {"date": date}}}}


# ----------------------------------------------------------------------
date_partition_set = PartitionSetDefinition(
    name="date_partition_set",
    pipeline_name="my_pipeline",
    partition_fn=get_date_partitions,
    run_config_fn_for_partition=run_config_for_date_partition,
)
# EXAMPLE CODE FROM DAGSTER DOCS.
# def weekday_partition_selector(
#     ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
# ) -> Union[Partition, List[Partition]]:
#     """Maps a schedule execution time to the corresponding partition or list
#     of partitions that
#     should be executed at that time"""
#     partitions = partition_set.get_partitions(ctx.scheduled_execution_time)
#     weekday = ctx.scheduled_execution_time.weekday() if ctx.scheduled_execution_time else 0
#     return partitions[weekday]

# My attempt. I do not want to partition by the weekday name, but just by the date. 
# Instead of returnng the partition_set, I think I need to do something else with it
# but I'm not sure what it is.
def daily_partition_selector(
    ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
) -> Union[Partition, List[Partition]]:
    return partition_set.get_partitions(ctx.scheduled_execution_time)

my_schedule = date_partition_set.create_schedule_definition(
    "my_schedule",
    "15 8 * * *",
    partition_selector=daily_partition_selector,
    execution_timezone="UTC",
)

当前的 dagster UI 将所有日期集中在分区部分。 实际结果

预期成绩

我错过了什么会给我预期的结果?

4

1 回答 1

0

在与 Dagster 的人们交谈后,他们向我指出了这份文档 https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#partition-based-schedules

这要简单得多,我最终得到了

@daily_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime(2021, 4, 1),
    execution_time=time(hour=8, minute=15),
    execution_timezone="UTC",
)
def my_schedule(date):
    return {
        "solids": {
            "data_to_process": {
                "config": {
                    "date": date.strftime("%Y-%m-%d")
                }
            }
        }
    }
于 2021-06-10T22:12:55.543 回答