2

我有一个 DAG

  1. 从云存储下载 csv 文件
  2. 通过 https 将 csv 文件上传到第三方

我正在执行的气流集群CeleryExecutor默认使用,所以我担心在某些时候当我扩大工作人员的数量时,这些任务可能会在不同的工作人员上执行。例如。工作人员 A 进行下载,工作人员 B 尝试上传,但找不到文件(因为它在工作人员 A 上)

是否有可能以某种方式保证下载和上传操作员都将在同一个气流工作人员上执行?

4

2 回答 2

1

对于这些类型的用例,我们有两种解决方案:

  1. 使用两个工作人员共享的网络安装驱动器,以便下载和上传任务都可以访问相同的文件系统
  2. 使用特定于工作人员的气流队列。如果只有一个工作人员在侦听此队列,您将保证两者都可以访问同一个文件系统。请注意,每个工作人员都可以侦听多个队列,因此您可以让它侦听“默认”队列以及用于此任务的自定义队列。
于 2017-08-23T19:45:18.747 回答
1

将第 1 步(csv 下载)和第 2 步(csv 上传)放入 subdag,然后通过 SubDagOperator 触发它,并将executor选项设置为 a SequentialExecutor- 这将确保第 1 步和第 2 步在同一个工作人员上运行。

这是一个说明该概念的工作 DAG 文件(实际操作被删除为 DummyOperators),在一些更大的过程的上下文中下载/上传步骤:

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors.sequential_executor import SequentialExecutor

PARENT_DAG_NAME='subdaggy'
CHILD_DAG_NAME='subby'

def make_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.%s' % (parent_dag_name, child_dag_name),
        schedule_interval=schedule_interval,
        start_date=start_date
        )

    task_download = DummyOperator(
        task_id = 'task_download_csv',
        dag=dag
        )

    task_upload = DummyOperator(
        task_id = 'task_upload_csv',
        dag=dag
        )

    task_download >> task_upload

    return dag
main_dag = DAG(
    PARENT_DAG_NAME,
    schedule_interval=None,
    start_date=datetime(2017,1,1)
)

main_task_1 = DummyOperator(
    task_id = 'main_1',
    dag = main_dag
)

main_task_2 = SubDagOperator(
    task_id = CHILD_DAG_NAME,
    subdag=make_sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, main_dag.schedule_interval),
    executor=SequentialExecutor(),
    dag=main_dag
)

main_task_3 = DummyOperator(
    task_id = 'main_3',
    dag = main_dag
)

main_task_1 >> main_task_2 >> main_task_3
于 2017-08-25T21:15:37.080 回答