访问 Composer 实例 GCS 存储桶的根文件夹或任何其他气流文件夹(如 /data)以保存简单 DAG 的任务输出文件的正确方法是什么:
import logging
from os import path
from datetime import datetime
import numpy as np
import pandas as pd
from airflow import models
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def write_to_file():
df = pd.DataFrame(data=np.random.randint(low=0, high=10, size=(5, 5)),
columns=['a', 'b', 'c', 'd', 'e'])
logging.info("Saving results")
file_path = path.join("output.csv")
df.to_csv(path_or_buf=file_path, index=False)
with models.DAG(dag_id='write_to_file',
schedule_interval='*/10 * * * *',
default_args={'depends_on_past': False,
'start_date': datetime(2018, 9, 8)}) as dag:
t_start = DummyOperator(task_id='start')
t_write = PythonOperator(
task_id='write',
python_callable=write_to_file
)
t_end = DummyOperator(task_id='end')
t_start >> t_write >> t_end
是否设置了一些环境变量或者我应该使用 GCS 挂钩?