6

我当前面临的问题是我在 MongoDB 集合中有文档,每个文档都需要由需要在非循环依赖图中运行的任务处理和更新。如果上游任务未能处理文档,则没有任何相关任务可以处理该文档,因为该文档尚未使用先决条件信息进行更新。

如果我要使用 Airflow,这给我留下了两个解决方案:

  1. 为每个文档触发一个 DAG,并使用 传递文档 ID --conf。问题在于这不是 Airflow 的预期使用方式。我永远不会运行预定的流程,根据文档在集合中的显示方式,我每天会制作 1440 个 Dagruns。

  2. 每个时期运行一个 DAG 以处理该时期在集合中创建的所有文档。这遵循 Airflow 的预期工作方式,但问题是,如果任务无法处理单个文档,则任何相关任务都无法处理任何其他文档。此外,如果一个文档比其他文档花费更长的时间来处理一个任务,那么那些其他文档正在等待该单个文档继续沿着 DAG 向下。

有比 Airflow 更好的方法吗?还是有比我目前看到的两种方法更好的方法在 Airflow 中处理这个问题?

4

5 回答 5

4

根据我在尝试回答这个问题时获得的知识,我得出的结论是,Airflow 并不是这项工作的工具。

Airflow 专为预定的幂等 DAG 设计。DagRun 还必须有一个唯一的execution_date; 这意味着在完全相同的开始时间运行相同的 DAG(如果我们同时收到两个文档,实际上是不可能的。当然,我们可以立即安排下一个 DagRun 连续,但是这个限制应该证明任何在某种程度上,尝试以这种方式使用 Airflow 永远是一种黑客行为。

我发现最可行的解决方案是改用Prefect,它的开发旨在克服 Airflow 的一些限制:

“Prefect 假设流程可以在任何时候以任何理由运行。”

Prefect 的 DAG 等价物是 Flow;我们可以利用的流程的一个关键优势是易于参数化。然后,通过一些线程,我们可以为流中的每个元素运行 Flow。这是一个示例流 ETL 管道:

import time
from prefect import task, Flow, Parameter
from threading import Thread
​
​
def stream():
    for x in range(10):
        yield x
        time.sleep(1)
​
​
@task
def extract(x):
    # If 'x' referenced a document, in this step we could load that document
    return x
​
​
@task
def transform(x):
    return x * 2
​
​
@task
def load(y):
    print("Received y: {}".format(y))
​
​
with Flow("ETL") as flow:
    x_param = Parameter('x')
    e = extract(x_param)
    t = transform(e)
    l = load(t)
​
for x in stream():
    thread = Thread(target=flow.run, kwargs={"x": x})
    thread.start()
于 2020-01-19T23:44:42.900 回答
0

我每天会制作 1440 个 Dagruns。

使用良好的 Airflow 架构,这是很有可能的。窒息点可能是

  1. executor - 例如使用 Celery Executor 而不是 Local Executor
  2. 后端数据库 - 根据需要进行监控和调整(索引、适当的存储等)
  3. webserver - 好吧,对于成千上万的 dagruns、任务等。也许只将 webeserver 用于 dev/qa 环境,而不是用于任务/dagruns 提交率较高的生产环境。您可以改用 cli 等。

另一种方法是通过运行多个 Airflow 实例进行扩展 - 将文档分区到十个桶,并将每个分区的文档分配给一个 Airflow 实例。

于 2019-10-16T21:21:44.217 回答
0

我们已经构建了一个系统,可以查询 MongoDB 的列表,并为每个包含一个 DAG 的项目生成一个 python 文件(注意:让每个 dag 都有自己的 python 文件有助于 Airflow 调度程序的效率,它是当前的设计) - 生成器 DAG 每小时运行一次,就在所有生成的 DAG 的计划每小时运行之前。

于 2020-01-20T15:31:15.820 回答
0

我会并行处理较重的任务并为下游提供成功的操作。据我所知,您不能将成功异步提供给下游任务,因此您仍然需要等待每个线程完成,直到向下游移动,但是,这仍然比为每条记录生成 1 个 dag 更容易接受,某事在这些行中:

任务1:通过一些时间戳读取mongo过滤(记住幂等性)和feed任务(即通过xcom);

任务2:通过PythonOperator并行做事,或者通过K8sPod更好,即:

def thread_fun(ret):
    while not job_queue.empty():
        job = job_queue.get()
        try:        
            ret.append(stuff_done(job))
        except:
            pass
    job_queue.task_done()
    return ret

# Create workers and queue
threads = []
ret = [] # a mutable object
job_queue = Queue(maxsize=0)

for thr_nr in appropriate_thread_nr:
    worker = threading.Thread(
        target=thread_fun,
        args=([ret])
    )
    worker.setDaemon(True)
    threads.append(worker)

# Populate queue with jobs
for row in xcom_pull(task_ids=upstream_task):
    job_queue.put(row)

# Start threads
for thr in threads:
    thr.start()

# Wait to finish their jobs
for thr in threads:
    thr.join()

xcom_push(ret)

任务 3:做更多来自上一个任务的事情,等等

于 2020-01-17T19:49:00.637 回答
0

您可以trigger_rule从“all_success”更改为“all_done”

https://github.com/apache/airflow/blob/62b21d747582d9d2b7cdcc34a326a8a060e2a8dd/airflow/example_dags/example_latest_only_with_trigger.py#L40

并且还可以创建一个分支来处理trigger_rule设置为“one_failed”的失败文档,以以某种方式移动那些失败的文档(例如移动到“失败”文件夹并发送通知)

于 2019-10-16T18:11:38.007 回答