0

我对 Airflow 比较陌生,正在寻求将 #following #airflow.providers.amazon.aws.transfers.ftp_to_s3 引入我的 DAG 的帮助。#参数(s3_bucket、aws_conn_id、ftp_conn_id 等)如何传递给这个操作符?这些参数会存在于引用的 python 脚本中吗?

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/transfers/ftp_to_s3/index.html

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/transfers/ftp_to_s3.html

    from airflow.models import DAG
    from airflow.operators.bash import BashOperator
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    
    from datetime import datetime
    
    
    default_args = {
        'start_date': datetime (2021, 1, 1)
    }
    
    with DAG('oempricetapes_poc', schedule_interval='@daily',
            default_args=default_args, 
            catchup=False) as dag:
    
        ftp_task = BashOperator(
            task_id='ftp_task',
            bash_command='sleep 3'
        )
    
        s3_sensor = BashOperator(
            task_id='s3_sensor',
            bash_command='sleep 3'
        )
    
        create_input_staging_table = PostgresOperator(
            task_id='create_input_staging_table',
            postgres_conn_id='qa_redshift_oempricetapes',
            sql='create_input_staging_table.sql'
        )
    
    
    ftp_task >> s3_sensor >> create_input_staging_table
4

0 回答 0