0

我正在评估像 Airflow 这样的框架,它能够在运行工作流时在运行时构建任务的动态图,即在开始工作流之前我不具体了解任务及其依赖关系,我只知道图的级别数。

我开始使用 Airflow,我正在使用 XCom 来保持图表的状态,如下所述: Proper way to create dynamic workflows in Airflow

我还通过在 XCom 行中存储包含任务依赖关系描述的 JSON 片段来扩展这种方法,例如:

{
    "key": "first_file",
    "tasks" :
        [
            {
              "task_id" : "third_task",
              "dependencies" : ["first_task", "second_task"]
            }
        ]
}

请注意,我真的不需要重新运行我的 DAG:我的 DAG 是要在外部安排的,一旦我的第一个 DagRun 完成,之后将不会删除/删除/修改任何新任务。如果需要重新运行,我将创建一个新的 DAG。

我使用的技术如下:我用两个任务创建 DAG,一个是传感器(这保证了 DagRun 始终处于运行状态直到结束)

class WaitTasksSensor(BaseSensorOperator):
    ...
    def poke(self, context):
        files = os.list_dir(MY_DIR)

        for f in files:
           filename = os.path.join(MY_DIR, f)
           json_file = open(filename).read()
           json_dict = json.loads(json_file)
           key = json_dict["key"]
           self.xcom_push(context, key + "_" + self.dag_id, json_file)

        # This sensor completes successfully only when the "end" task appears in the graph
        last_task_id = "end_" + self.dag_id
        return last_task_id in self.dag.task_ids


def create_dags(dag_id):
    with DAG(dag_id, schedule_interval=None):
        first = DummyOperator(task_id="first_" + dag_id)

        wait_sensor = WaitTasksSensor(task_id="wait_sensor_" + dag_id, mode="reschedule")
        first >> wait_sensor

        pull_tasks(wait_sensor) # Code below
dag = create_dags("dag_1")

当传感器推送代表任务及其依赖项的 JSON 文件(这些文件不断出现在一个文件夹中)时,我尝试在 DAG 代码中从 XCom 中提取任务。

def pull_tasks(previous_task):
   current_dag = previous_task.dag
   dag_id = current_dag.dag_id
   last_run = current_dag.get_last_dagrun(include_externally_triggered=True)
  
   if not last_run:
       return

   last_run_date = last_run.execution_date
   task_instances = previous_task.get_task_instances(start_date=last_run_date)

   if not task_instances:
       return

   last_task_instance = task_instance[-1]

   json_ids = [...]

   for json_id in json_ids:
       task_graph_json = last_task_instance.xcom_pull(task_ids=previous_task.task_id,
                                                      key=json_id + "_" + dag_id,
                                                      dag_id=dag_id)
    
       if task_graph:
           task_graph_deserialized = json.loads(task_graph_json)
           tasks = task_graph_deserialized["tasks"]
           create_dynamic_tasks(dag, task_dicts)

def create_dynamic_tasks(dag, task_dicts):
   dag_id = dag.dag_id

   for task_dict in task_dicts:
       task = DummyOperator(task_id=task_id + "_" + dag_id,
                            dag=dag)
       dependencies = task_dict["dependencies"]
 
       for predecessor_id in dependencies:
           predecessor = dag.get_task(predecessor_id + "_" + dag_id)
           predecessor >> task

我的问题是:Airflow 是这种用例的有效工具吗?或者我是否将其与主要用例相距甚远(即在运行时未生成静态任务的固定工作流)?

这种方法是否可以扩展到数万个 DAG 和数十万个任务?或者是否有任何其他类似的工具可以以更简单的方式实现这一目标?

4

1 回答 1

1

你的问题似乎类似于这个问题在那里的答案中,如果您真的必须使用 Airflow ,我会建议一个丑陋的解决方案。

但是,您的问题的答案是:我建议您查看Argo Workflows。由于它完全在 Kubernetes 上运行,因此它也很容易扩展。

于 2020-11-14T16:01:05.223 回答