我正在尝试在 Dagster 中创建允许我进行回填的分区。该文档有一个示例,但它使用星期几(我能够复制)。但是,我正在尝试创建带有日期的分区。
DATE_FORMAT = "%Y-%m-%d"
BACKFILL_DATE = "2021-04-01"
TODAY = datetime.today()
def get_number_of_days():
backfill_date_obj = datetime.strptime(BACKFILL_DATE, DATE_FORMAT)
delta = TODAY - backfill_date_obj
return delta
def get_date_partitions():
return [
Partition(
[
datetime.strftime(TODAY - timedelta(days=x), DATE_FORMAT)
for x in range(get_number_of_days().days)
]
)
]
def run_config_for_date_partition(partition):
date = partition.value
return {"solids": {"data_to_process": {"config": {"date": date}}}}
# ----------------------------------------------------------------------
date_partition_set = PartitionSetDefinition(
name="date_partition_set",
pipeline_name="my_pipeline",
partition_fn=get_date_partitions,
run_config_fn_for_partition=run_config_for_date_partition,
)
# EXAMPLE CODE FROM DAGSTER DOCS.
# def weekday_partition_selector(
# ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
# ) -> Union[Partition, List[Partition]]:
# """Maps a schedule execution time to the corresponding partition or list
# of partitions that
# should be executed at that time"""
# partitions = partition_set.get_partitions(ctx.scheduled_execution_time)
# weekday = ctx.scheduled_execution_time.weekday() if ctx.scheduled_execution_time else 0
# return partitions[weekday]
# My attempt. I do not want to partition by the weekday name, but just by the date.
# Instead of returnng the partition_set, I think I need to do something else with it
# but I'm not sure what it is.
def daily_partition_selector(
ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
) -> Union[Partition, List[Partition]]:
return partition_set.get_partitions(ctx.scheduled_execution_time)
my_schedule = date_partition_set.create_schedule_definition(
"my_schedule",
"15 8 * * *",
partition_selector=daily_partition_selector,
execution_timezone="UTC",
)
当前的 dagster UI 将所有日期集中在分区部分。 实际结果
我错过了什么会给我预期的结果?