6

考虑以下 DAG 示例,其中第一个任务 ,get_id_creds从数据库中提取凭据列表。此操作告诉我数据库中的哪些用户可以运行进一步的数据预处理,并将这些 id 写入文件/tmp/ids.txt。然后我将这些 id 扫描到我的 DAG 中,并使用它们生成upload_transaction可以并行运行的任务列表。

我的问题是:有没有更惯用的正确、动态的方式来使用气流来做到这一点?我这里的东西感觉笨拙而脆弱。如何将有效 ID 列表从一个进程直接传递给定义后续下游进程的进程?

from datetime import datetime, timedelta
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'schedule_interval': None
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds, 
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in uids:
    upload_transactions = PythonOperator(
        task_id=uid,
        python_callable=dash_workers.upload_transactions,
        op_args=[uid],
        dag=DAG)
    upload_transactions.set_downstream(get_id_creds)
4

2 回答 2

5

Per @Juan Riza's suggestion I checked out this link: Proper way to create dynamic workflows in Airflow. This was pretty much the answer, although I was able to simplify the solution enough that I thought I would offer my own modified version of the implementation here:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

ENV = os.environ

default_args = {
  # 'start_date': datetime.now(),
  'start_date': datetime(2017, 7, 18)
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

clear_tables = PythonOperator(
  task_id='clear_tables',
  python_callable=dash_workers.clear_db,
  dag=DAG)

def id_worker(uid):
    return PythonOperator(
        task_id=uid,
        python_callable=dash_workers.main_preprocess,
        op_args=[uid],
        dag=DAG)

for uid in capone_dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)

clear_tables cleans the database that will be re-built as a result of the process. id_worker is a function that dynamically generates new preprocessing tasks, based on the array of ID values returned from get_if_creds. The task ID is just the corresponding user ID, though it could easily have been an index, i, as in the example mentioned above.

NOTE That bitshift operator (<<) looks backwards to me, as the clear_tables task should come first, but it's what seems to be working in this case.

于 2017-07-27T23:44:16.367 回答
2

考虑到 Apache Airflow 是一个工作流管理工具,即。它确定了用户定义的任务之间的依赖关系(例如)与 apache Nifi 是一种数据流管理工具,即。这里的依赖关系是通过任务传输的数据。

也就是说,我认为您的方法是正确的(我的评论基于发布的代码),但Airflow 提供了一个名为XCom. 它允许任务通过传递一些数据在它们之间“交叉通信”。传递的数据应该有多大?由你来测试!但一般应该不会那么大。我认为它是键值对的形式,它存储在气流元数据库中,例如,您不能传递文件,但带有 id 的列表可以工作。

就像我说的,你应该自己测试一下。我很高兴知道你的经历。是一个示例 dag,它演示了如何使用,XCom这里必要的文档。干杯!

于 2017-07-26T07:00:46.830 回答