0

目前,我面临着dagster.core.errors.PartitionExecutionError来自 Dagster 的错误日志对我来说似乎并不明显。

dagster.core.errors.PartitionExecutionError: Error occurred during the evaluation of the `run_config_for_partition` function for partition set download_firebase_data_local_partition_set 
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 292, in get_partition_config
    return ExternalPartitionConfigData(name=partition.name, run_config=run_config)
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 192, in user_code_error_boundary
    raise error_cls(
The above exception was caused by the following exception:

TypeError: daily_download_config() takes 1 positional argument but 2 were given 
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 185, in user_code_error_boundary
    yield
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 291, in get_partition_config
    run_config = partition_set_def.run_config_for_partition(partition)
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/partition.py", line 441, in run_config_for_partition
    return copy.deepcopy(self._user_defined_run_config_fn_for_partition(partition))
  File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/time_window_partitions.py", line 192, in <lambda>
    run_config_for_partition_fn=lambda partition: fn(

我目前的设置是

@graph
def download():
    """
    Download data from BigQuery then upload to S3
    """
    extract_data_in_date()


@daily_partitioned_config(start_date=datetime(2021, 12, 1))
def daily_download_config(date: datetime):
    return {
        "resources": {
            "date": date.strftime("%Y-%m-%d")
        }
    }

download_local_job = download.to_job(
    name=f'{NAME}_local',
    resource_defs={
        **{
            "date": make_values_resource(date=str),
            "project_name": ResourceDefinition.hardcoded_resource("test-123")
        },
        **RESOURCES_LOCAL,
    },
    config=daily_download_config,
    executor_def=in_process_executor
)

我不确定我错在哪里,请您帮忙

4

1 回答 1

0

@daily_paritioned_config需要能够接受两个参数,一个用于时间窗口的开始,一个用于结束。daily_download_config实际上并没有使用这个结束日期值,但它仍然需要出现在签名中,因为 Dagster 将尝试将两个参数传递给这个函数,不管

于 2022-01-05T18:12:00.830 回答