1

访问 Composer 实例 GCS 存储桶的根文件夹或任何其他气流文件夹(如 /data)以保存简单 DAG 的任务输出文件的正确方法是什么:

import logging
from os import path
from datetime import datetime

import numpy as np
import pandas as pd
from airflow import models
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


def write_to_file():
    df = pd.DataFrame(data=np.random.randint(low=0, high=10, size=(5, 5)),
                      columns=['a', 'b', 'c', 'd', 'e'])
    logging.info("Saving results")

    file_path = path.join("output.csv")

    df.to_csv(path_or_buf=file_path, index=False)


with models.DAG(dag_id='write_to_file',
                schedule_interval='*/10 * * * *',
                default_args={'depends_on_past': False,
                              'start_date': datetime(2018, 9, 8)}) as dag:
    t_start = DummyOperator(task_id='start')

    t_write = PythonOperator(
        task_id='write',
        python_callable=write_to_file
    )

    t_end = DummyOperator(task_id='end')

    t_start >> t_write >> t_end  

是否设置了一些环境变量或者我应该使用 GCS 挂钩?

4

1 回答 1

2

我在作曲家邮件列表上得到答案“如果您将操作员输出数据保存到/home/airflow/gcs/data,它将自动同步到gs://{composer-bucket}/data”。

于 2018-09-10T08:02:10.630 回答