我最近选择了 Dagster 来评估作为 Airflow 的替代品。
我一直无法理解资源的概念,并试图了解我正在尝试做的事情是否可能或可以以不同的方式更好地实现。
我有一个像下面这样的帮助类,可以帮助保持代码干燥
from dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource
class HelperAwsS3:
def __init__(self, s3_resource):
self.s3_resource = s3_resource
def s3_list_bucket(self, bucket, prefix):
return self.s3_resource.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)
def s3_download_file(self, bucket, file, local_path):
self.s3_resource.meta.client.download_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
def s3_upload_file(self, bucket, file, local_path):
self.s3_resource.meta.client.upload_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
s3_resource实际上是dagster_aws.s3.s3_resource,它将帮助我使用本地 aws 凭据连接到 AWS。
当我在下面的@resource部分进行调用时,我不确定如何将 s3_resource 传递给HelperAwsS3 。
@resource
def connection_helper_aws_s3_resource(context):
return HelperAwsS3()
请问有什么指点吗?还是我做错了,需要以不同的方式做?
谢谢你的帮助。