0

我有一个由两个任务组成的简单气流工作流程。确实会下载包含股票数据的 csv 文件。另一个提取最高股票价格并将数据写入另一个文件。

如果我运行第一个任务然后第二个一切正常,而不是如果执行:airflow runstocks_d get_max_share 它无法满足依赖关系。

import csv
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests


def get_stock_data():
    url = "https://app.quotemedia.com/quotetools/getHistoryDownload.csv?&webmasterId=501&startDay=02&startMonth=02&startYear=2002&endDay=02&endMonth=07&endYear=2009&isRanged=false&symbol=APL"
    try:
        r = requests.get(url)
    except requests.RequestException as re:
        raise
    else:
        with open('/tmp/stocks/airflow_stock_data.txt', 'w') as f:
            f.write(r.text)

def get_max_share():
    stock_data = []
    stock_max = {}
    with open('/tmp/stocks/airflow_stock_data.txt', 'r') as f:
        stock_reader = csv.reader(f)
        next(stock_reader, None)
        for row in stock_reader:
            stock_data.append(row)

    for stock in stock_data:
        stock_max[stock[2]] = stock[0]

    with open('/tmp/stocks/max_stock', 'w') as f:
        stock_price = max(stock_max.keys())
        stock_max_price_date = stock_max[stock_price]
        stock_entry = stock_max_price_date + ' -> ' + stock_price
        f.write(stock_entry)


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 5, 30),
    'email': ['mainl@domain.io'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'catchup': False,
    }

dag = DAG('stocks_d', default_args=default_args, schedule_interval=timedelta(minutes=5))


task_get_stocks = PythonOperator(task_id='get_stocks', python_callable=get_stock_data, dag=dag)
task_get_max_share = PythonOperator(task_id='get_max_share', python_callable=get_max_share, dag=dag)

task_get_max_share.set_upstream(task_get_stocks)

任何想法为什么会发生这种情况?

4

1 回答 1

0

$ airflow runstocks_d get_max_share 上面的命令只运行 get_max_share 任务而不是运行之前的前一个任务。

如果您需要检查整个 dag 的运行情况,请尝试以下命令 $ airflow trigger_dag stock_d

于 2017-05-30T10:36:01.387 回答