2

所以我的问题是我在 Airflow 中构建 ETL 管道,但实际上首先在 Jupyter 笔记本中开发和测试提取、转换和加载功能。所以我最终总是在我的 Airflow Python 操作符代码和 Jupyter 笔记本之间来回复制粘贴,效率非常低!我的直觉告诉我,所有这些都可以自动化。

基本上,我想在 Jupyter 中编写我的提取、转换和加载函数并让它们留在那里,同时仍然在 Airflow 中运行管道并显示提取、转换和加载任务,以及重试和 Airflow 的所有好东西开箱即用。

Papermill 能够参数化笔记本,但我真的想不出这对我有什么帮助。有人可以帮我连接点吗?

4

6 回答 6

4

[免责声明:我是上述开源项目的提交者之一。] 我们创建了Elyra - 一组 JupyterLab 扩展 - 来简化此类工作。我们刚刚发布了 2.1 版,它提供了一个可视化编辑器,您可以使用该编辑器从 notebook 和 Python 脚本(R 支持应该很快提供)组装管道,并在 Apache Airflow、Kubeflow Pipelines 或 JupyterLab 本地运行它们。对于 Airflow(在 Kubernetes 上运行),我们创建了一个自定义操作符,负责处理和执行。我写了一篇关于它的总结文章,你可以在这里找到,如果你有兴趣尝试一下,我们有几个介绍性教程。

于 2021-03-23T14:42:19.090 回答
3

单个主 Jupyter 笔记本,带有任意数量的从笔记本(用作模板),使用 顺序执行papermill.execute_notebook,应该足以自动化任何 ML 管道。

为了在管道阶段(从一个从笔记本到下一个)之间传递信息,可以使用另一个 Netflix 包,scrapbook它允许我们在从笔记本中记录 python 对象(因为它们由 处理papermill),然后检索这些对象来自管道主机中的从属设备(保存使用scrapbook.glue和读取 - scrapbook.read_notebook)。

从任何已完成的阶段恢复也是可能的,但它需要将在前一阶段保存的必要输入存储在可从主节点访问的可预测位置(例如,在本地主 JSON 文件或 MLflow 中)。

也可以使用 cron 作业(例如来自 Kubernetes )来安排主笔记本。

  • 备择方案

由于管理成本(5 个容器,包括 2 个数据库),对于大多数 ML 团队来说,Airflow 可能是一种过度杀伤力,而其他(非 Netflix)python 包要么需要更多样板文件(Luigi),要么需要额外的特权和执行程序的自定义 docker 图像( Elyra),而 Ploomber 将面临维护者稀少的风险。

于 2021-02-27T14:38:53.327 回答
2

正如您所建议的,可以通过 Papermill 在您的 Airflow 管道中使用 Jupyter Notebooks。但是,Airflow 的优点之一是您可以将管道分成独立的步骤,这些步骤彼此独立,因此如果您决定在一个 Jupyter Notebook 中编写整个管道,那么这将违背使用 Airflow 的目的。

因此,假设您的每个离散ETL 步骤都位于单独的 Jupyter Notebook 中,您可以尝试以下操作:

  1. 为每个步骤创建一个 Jupyter Notebook。例如,copy_data_from_s3, cleanup_data, load_into_database(3 个步骤,每个一个笔记本)。
  2. 确保每个笔记本都按照 Papermill说明进行参数化。这意味着,向每个单元格添加一个标签,该标签声明可以从外部参数化的变量。
  3. 确保 Airflow 可以找到这些笔记本(例如,在 DAG 所在的同一文件夹中)
  4. 编写将使用 Papermill 参数化和运行笔记本的函数,每个步骤一个。例如:
import papermill as pm
# ...
# define DAG, etc.
# ...

def copy_data_from_s3(**context):
    pm.execute_notebook(
           "copy_data_from_s3_step.ipynb",
           "copy_data_from_s3_step.ipynb"
            parameters=dict(date=context['execution_date'])) # pass some context parameter if you need to
        )
  1. 最后,将步骤设置为 a PythonOperator(尽管BashOperator如果您想从命令行运行 Papermill 也可以使用 a )。要匹配上面的函数:
copy_data = PythonOperator(dag=dag,
                           task_id='copy_data_task',
                           provide_context=True,
                           python_callable=copy_data_from_s3)

于 2020-10-29T18:01:32.720 回答
2

Airflow 有一个造纸厂操作员,但开发经验不是很好。Airflow 中基于 Python 的 DAG 的主要问题之一是它们在相同的 Python 环境中执行,一旦您拥有多个 DAG,就会导致依赖性问题。有关详细信息,请参阅此

如果您愿意尝试一个新工具,我建议您使用Ploomber(免责声明:我是作者),它可以编排基于笔记本的管道(它在引擎盖下使用 papermill)。您可以在本地开发并导出到 Kubernetes 或 Airflow。

如果您想知道 Ploomber 中的项目是什么样子,请随时查看示例存储库

于 2020-12-28T19:19:17.803 回答
1

Ploomber 将能够解决您的问题!它实际上支持参数化笔记本并通过 Airflow/shell 脚本/Slurm/Kubeflow/Argo 进行部署。这也有助于您定义模块化管道而不是单体笔记本。这很容易/直接开始,它比造纸厂提供了更多的灵活性。一探究竟!https://github.com/ploomber/ploomber

于 2022-01-25T18:42:52.363 回答
1

为什么要将 ETL 作业用作 jupyter notebook。你看到什么优势?Notebooks 通常用于构建带有实时数据的漂亮文档。ETL 作业应该是在后台运行并自动化的脚本。

为什么这些工作不能是纯 Python 代码而不是笔记本?

此外,当您使用 PapermillOperator 运行笔记本时,运行的输出将是另一个保存在某处的笔记本。不断检查这些输出文件并不是那么友好。

我建议用纯 python 编写 ETL 作业并使用 PythonOperator 运行它。这更简单,更容易维护。

如果您想使用笔记本电脑的精美功能,那就另当别论了。

于 2021-05-24T12:17:42.243 回答