我正在评估像 Airflow 这样的框架,它能够在运行工作流时在运行时构建任务的动态图,即在开始工作流之前我不具体了解任务及其依赖关系,我只知道图的级别数。
我开始使用 Airflow,我正在使用 XCom 来保持图表的状态,如下所述: Proper way to create dynamic workflows in Airflow
我还通过在 XCom 行中存储包含任务依赖关系描述的 JSON 片段来扩展这种方法,例如:
{
"key": "first_file",
"tasks" :
[
{
"task_id" : "third_task",
"dependencies" : ["first_task", "second_task"]
}
]
}
请注意,我真的不需要重新运行我的 DAG:我的 DAG 是要在外部安排的,一旦我的第一个 DagRun 完成,之后将不会删除/删除/修改任何新任务。如果需要重新运行,我将创建一个新的 DAG。
我使用的技术如下:我用两个任务创建 DAG,一个是传感器(这保证了 DagRun 始终处于运行状态直到结束)
class WaitTasksSensor(BaseSensorOperator):
...
def poke(self, context):
files = os.list_dir(MY_DIR)
for f in files:
filename = os.path.join(MY_DIR, f)
json_file = open(filename).read()
json_dict = json.loads(json_file)
key = json_dict["key"]
self.xcom_push(context, key + "_" + self.dag_id, json_file)
# This sensor completes successfully only when the "end" task appears in the graph
last_task_id = "end_" + self.dag_id
return last_task_id in self.dag.task_ids
def create_dags(dag_id):
with DAG(dag_id, schedule_interval=None):
first = DummyOperator(task_id="first_" + dag_id)
wait_sensor = WaitTasksSensor(task_id="wait_sensor_" + dag_id, mode="reschedule")
first >> wait_sensor
pull_tasks(wait_sensor) # Code below
dag = create_dags("dag_1")
当传感器推送代表任务及其依赖项的 JSON 文件(这些文件不断出现在一个文件夹中)时,我尝试在 DAG 代码中从 XCom 中提取任务。
def pull_tasks(previous_task):
current_dag = previous_task.dag
dag_id = current_dag.dag_id
last_run = current_dag.get_last_dagrun(include_externally_triggered=True)
if not last_run:
return
last_run_date = last_run.execution_date
task_instances = previous_task.get_task_instances(start_date=last_run_date)
if not task_instances:
return
last_task_instance = task_instance[-1]
json_ids = [...]
for json_id in json_ids:
task_graph_json = last_task_instance.xcom_pull(task_ids=previous_task.task_id,
key=json_id + "_" + dag_id,
dag_id=dag_id)
if task_graph:
task_graph_deserialized = json.loads(task_graph_json)
tasks = task_graph_deserialized["tasks"]
create_dynamic_tasks(dag, task_dicts)
def create_dynamic_tasks(dag, task_dicts):
dag_id = dag.dag_id
for task_dict in task_dicts:
task = DummyOperator(task_id=task_id + "_" + dag_id,
dag=dag)
dependencies = task_dict["dependencies"]
for predecessor_id in dependencies:
predecessor = dag.get_task(predecessor_id + "_" + dag_id)
predecessor >> task
我的问题是:Airflow 是这种用例的有效工具吗?或者我是否将其与主要用例相距甚远(即在运行时未生成静态任务的固定工作流)?
这种方法是否可以扩展到数万个 DAG 和数十万个任务?或者是否有任何其他类似的工具可以以更简单的方式实现这一目标?