我有 DAGS,我想在同一个分布式气流集群的不同 python 环境中运行。是否可以在执行 DAG 时创建虚拟 python 环境?
我希望将 tfx ML 管道添加到我们的分布式气流平台,并且无法在每个工作节点中单独安装 tfx。
编辑:我希望使用基于 python 运算符的固有 tfx AirflowDagRunner。我宁愿不更改任何 tfx 代码来处理此案。
您应该尝试PythonVirtualenvOperator,Airflow 将创建特殊的 venv 与操作员参数的依赖关系
带有 openpyxl 和 pandas 库的示例 venv:
extract_some_data = PythonVirtualenvOperator(
task_id="extract_some_data ",
python_callable=extract_some_data ,
requirements=["openpyxl", "pandas"],
python_version='3.7',
system_site_packages=False,
provide_context=True,
op_kwargs={
"uuid": "{{ ti.xcom_pull(key=None, task_ids='generate_uuid') }}"
},
dag=dag
)
气流文件:
您可以随意创建和拆除虚拟环境 ( venv
)。如果您的软件在被拆除之前没有等待环境的结果,那将是最好的。
要以编程方式创建环境,只需从您最喜欢的编程语言运行 shell 命令并传入常用命令行。如果您使用 venv 中的程序执行 Python 程序python
,则将选择 venv。
例如,如果您在 中创建一个 venv /var/venvs/123
,请使用类似的命令行运行您的 Python 程序/var/venvs/123/bin/python
,并传递任何必要的参数。