正如您所建议的,可以通过 Papermill 在您的 Airflow 管道中使用 Jupyter Notebooks。但是,Airflow 的优点之一是您可以将管道分成独立的步骤,这些步骤彼此独立,因此如果您决定在一个 Jupyter Notebook 中编写整个管道,那么这将违背使用 Airflow 的目的。
因此,假设您的每个离散ETL 步骤都位于单独的 Jupyter Notebook 中,您可以尝试以下操作:
- 为每个步骤创建一个 Jupyter Notebook。例如,
copy_data_from_s3
, cleanup_data
, load_into_database
(3 个步骤,每个一个笔记本)。
- 确保每个笔记本都按照 Papermill说明进行参数化。这意味着,向每个单元格添加一个标签,该标签声明可以从外部参数化的变量。
- 确保 Airflow 可以找到这些笔记本(例如,在 DAG 所在的同一文件夹中)
- 编写将使用 Papermill 参数化和运行笔记本的函数,每个步骤一个。例如:
import papermill as pm
# ...
# define DAG, etc.
# ...
def copy_data_from_s3(**context):
pm.execute_notebook(
"copy_data_from_s3_step.ipynb",
"copy_data_from_s3_step.ipynb"
parameters=dict(date=context['execution_date'])) # pass some context parameter if you need to
)
- 最后,将步骤设置为 a
PythonOperator
(尽管BashOperator
如果您想从命令行运行 Papermill ,也可以使用 a )。要匹配上面的函数:
copy_data = PythonOperator(dag=dag,
task_id='copy_data_task',
provide_context=True,
python_callable=copy_data_from_s3)