0

我每天凌晨 12 点后 3 分钟触发@daily_schedule

由“2021-02-16 00:03:00”的预定刻度触发时

日期输入显示' 2021-02-15 00:00:00',分区标记为'2021-02-15'


而如果通过分区“2021-02-16”的回填触发

日期输入显示' 2021-02-16 00:00:00',分区标记为'2021-02-16'


为什么预定的刻度会在前一天填充分区?是否可以选择使用执行的日期时间(不使用 cron @schedule)?当我使用确切日期的时间戳执行查询时,这种差异令人困惑

PS我已经测试了预定运行和回填运行以具有相同的时区。


@solid()
def test_solid(_, date):
    _.log.info(f"Input date: {date}")

@pipeline()
def test_pipeline():
    test_solid()

@daily_schedule(
    pipeline_name="test_pipeline",
    execution_timezone="Asia/Singapore",
    start_date=START_DATE,
    end_date=END_DATE,
    execution_time=time(00, 03),
    # should_execute=four_hourly_fitler
)
def test_schedule_daily(date):
    timestamp = date.strftime("%Y-%m-%d %X")
    return {
        "solids": {
            "test_solid":{
                "inputs": {
                    "date":{
                        "value": timestamp
                    }
                }
            }
        }
    }

4

2 回答 2

0

抱歉这里的麻烦 - 系统在这里所做的基本假设是,对于按日期分区的管道上的计划,您不会在当天完成之前填写分区(即填写工作2/15 的数据要到 2/16 的第二天才能运行)。这是计划的 ETL 作业中的常见模式,但您完全正确,并非所有计划都需要这种行为,这是一个很好的反馈,我们应该使这个用例更容易。

可以按照您想要的方式为分区制定计划,但比较麻烦。它看起来像这样:


from dagster import PartitionSetDefinition, date_partition_range, create_offset_partition_selector

def partition_run_config(date):
    timestamp = date.strftime("%Y-%m-%d %X")
    return {
        "solids": {
            "test_solid":{
                "inputs": {
                    "date":{
                        "value": timestamp
                    }
                }
            }
        }
    }

test_partition_set = PartitionSetDefinition(
    name="test_partition_set",
    pipeline_name="test_pipeline",
    partition_fn=date_partition_range(start=START_DATE, end=END_DATE, inclusive=True, timezone="Asia/Singapore"),
    run_config_fn_for_partition=partition_run_config,
)

test_schedule_daily = (
    test_partition_set.create_schedule_definition(
        "test_schedule_daily",
        "3 0 * * *",
        execution_timezone="Asia/Singapore",
        partition_selector=create_offset_partition_selector(lambda d:d.subtract(minutes=3)),
    )
)

这与@daily_schedule 的实现非常相似,它只是使用不同的函数将调度执行时间映射到分区(减去 3 分钟而不是 3 分钟和 1 天 - 这是 create_offset_partition_selector 部分)。

我将提交一个问题,要求自定义分区调度装饰器的映射,但同时类似的事情可能会解除您的阻止。感谢您的反馈!

于 2021-02-16T16:26:56.170 回答
0

只是对此的更新:我们向 'daily_schedule' 装饰器(以及与其他调度装饰器类似的参数)添加了一个 'partition_days_offset' 参数,可让您自定义此行为。默认仍然是返回 1 天,但设置 partition_days_offset=0 会给你希望的行为,其中执行日与分区日相同。这应该会在我们 2 月 18 日的下一个每周发布中上线。

于 2021-02-17T19:38:27.753 回答