0

我最近选择了 Dagster 来评估作为 Airflow 的替代品。

我一直无法理解资源的概念,并试图了解我正在尝试做的事情是否可能或可以以不同的方式更好地实现。

我有一个像下面这样的帮助类,可以帮助保持代码干燥

from dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource

class HelperAwsS3:
    def __init__(self, s3_resource):
        self.s3_resource = s3_resource

    def s3_list_bucket(self, bucket, prefix):
        return self.s3_resource.list_objects_v2(
            Bucket=bucket,
            Prefix=prefix
        )

    def s3_download_file(self, bucket, file, local_path):
        self.s3_resource.meta.client.download_file(
            Bucket=bucket,
            Key=file,
            Filename=local_path
        )

    def s3_upload_file(self, bucket, file, local_path):
        self.s3_resource.meta.client.upload_file(
            Bucket=bucket,
            Key=file,
            Filename=local_path
        )

s3_resource实际上dagster_aws.s3.s3_resource,它将帮助我使用本地 aws 凭据连接到 AWS。

当我在下面的@resource部分进行调用时,我不确定如何将 s3_resource 传递给HelperAwsS3 。

@resource
def connection_helper_aws_s3_resource(context):
    return HelperAwsS3()

请问有什么指点吗?还是我做错了,需要以不同的方式做?

谢谢你的帮助。

4

1 回答 1

0

我在 dagster Slack 频道上发布了同样的问题,并很快得到了乐于助人的团队的回复。在这里发布它,以防它帮助某人 -

保留您的 HelperAwsS3 类并编写您自己的使用 s3 资源的资源,它可能看起来像这样:

@resource(required_resource_keys={"s3"})
def connection_helper_aws_s3_resource(context):
    return HelperAwsS3(s3_resource=context.resources.s3)

(然后确保在您的模式定义中包含 s3 资源和您的自定义资源:

@pipeline(mode_defs=[ModeDefinition(
  resource_defs={"s3": s3_resource, "connection_helper_aws_s3": connection_helper_aws_s3_resource}
)]):
  ...
于 2021-10-13T16:46:30.750 回答