我有一个工作的 Dataflow 管道,第一次运行setup.py
以安装一些本地帮助模块。我现在想使用 Cloud Composer/Apache Airflow 来安排管道。我已经创建了我的 DAG 文件,并将它与我的管道项目一起放置在指定的 Google Storage DAG 文件夹中。文件夹结构如下所示:
{Composer-Bucket}/
dags/
--DAG.py
Pipeline-Project/
--Pipeline.py
--setup.py
Module1/
--__init__.py
Module2/
--__init__.py
Module3/
--__init__.py
我的 DAG 中指定 setup.py 文件的部分如下所示:
resumeparserop = dataflow_operator.DataFlowPythonOperator(
task_id="resumeparsertask",
py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
dataflow_default_options={
"project": {PROJECT-NAME},
"setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})
但是,当我查看 Airflow Web UI 中的日志时,我收到错误消息:
RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.
我不确定为什么它无法找到安装文件。如何使用设置文件/模块运行我的 Dataflow 管道?