我是 dagster 的新手,我正在尝试通过调用它的作业将资源传递给 dagster 操作,即使在遵循文档之后我也遇到问题,我不确定我是否需要再次将配置传递给作业似乎没有去工作。这是代码。
错误dagster.core.errors.DagsterInvalidConfigError: Error in config for job Error 1: Missing required config entry "resources" at the root.
import os
from dotenv import load_dotenv
load_dotenv()
@op
def return_one(context):
context.log.info(f'return_one {os.environ.get("BUCKET")}')
return 1
@op(required_resource_keys={"boto3_connection"})
def add_two(context, i: int):
context.log.info(f'##### {context.resources.boto3_connection.get_client()}')
return i + 2
@op
def multi_three(i: int):
return i * 3
class Boto3Connector(object):
def __init__(self, aws_access_key_id, aws_secret_access_key):
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
def get_client(self, resource="s3"):
session = boto3.session.Session()
session_client = session.client(
service_name=resource,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
)
return session_client
@resource(
config_schema={
'aws_access_key_id': StringSource,
'aws_secret_access_key': StringSource
})
def boto3_connection(context):
return Boto3Connector(
context.resource_config['aws_access_key_id'],
context.resource_config['aws_secret_access_key']
)
@job(resource_defs={'boto3_connection': boto3_connection})
def my_job():
multi_three(add_two(return_one()))```