1

我是 dagster 的新手,我正在尝试通过调用它的作业将资源传递给 dagster 操作,即使在遵循文档之后我也遇到问题,我不确定我是否需要再次将配置传递给作业似乎没有去工作。这是代码。

错误dagster.core.errors.DagsterInvalidConfigError: Error in config for job Error 1: Missing required config entry "resources" at the root.

import os
from dotenv import load_dotenv
load_dotenv()


@op
def return_one(context):
    context.log.info(f'return_one {os.environ.get("BUCKET")}')
    return 1


@op(required_resource_keys={"boto3_connection"})
def add_two(context, i: int):
    context.log.info(f'##### {context.resources.boto3_connection.get_client()}')
    return i + 2


@op
def multi_three(i: int):
    return i * 3


class Boto3Connector(object):
    def __init__(self, aws_access_key_id, aws_secret_access_key):
        self.aws_access_key_id = aws_access_key_id
        self.aws_secret_access_key = aws_secret_access_key


    def get_client(self, resource="s3"):
        session = boto3.session.Session()

        session_client = session.client(
            service_name=resource,
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
        )
        return session_client


@resource(
    config_schema={
        'aws_access_key_id': StringSource,
        'aws_secret_access_key': StringSource
    })
def boto3_connection(context):
    return Boto3Connector(
        context.resource_config['aws_access_key_id'],
        context.resource_config['aws_secret_access_key']
    )


@job(resource_defs={'boto3_connection': boto3_connection})
def my_job():
    multi_three(add_two(return_one()))```
4

1 回答 1

1

我的问题是假设资源配置会自动传递,但您必须在作业配置中指定它们。所以只是添加了配置。

@job(resource_defs={'boto3_connection': boto3_connection},
        config={'resources':
        { "boto3_connection": {
            "config": {
                "aws_access_key_id": {"env": "AWS_ACCESS_KEY_ID"},
                "aws_secret_access_key": {"env": "AWS_SECRET_ACCESS_KEY"},
            }
        }}})
def my_job():
   multi_three(add_two(return_one()))`

在dagster slack上被指向正确的方向

于 2021-11-19T05:47:04.057 回答