我正在使用 GCP Dataproc 和 Airflow 开发数据解决方案。
虽然通过 Airflow 创建 dataproc 集群很容易使用 DataprocCreateClusterOperator,但挑战是当 dataproc 集群创建由于正当原因而失败时 - 集群区域中的 IP 耗尽等。
我们为此尝试了两种方法-
使用继承自 DataprocCreateClusterOperator 的自定义运算符:使用它我们能够实现集群创建,但我们无法检查任务的 retry_number(由于自定义运算符中的 provider_context=True 参数不可用,这有助于 task_instance 对象)
使用 Python 运算符调用 DataprocCreateClusterOperator(接受 provider_context=True 并可以检查任务 retry_number):在这种情况下,python 运算符执行成功,但不会创建 dataproc 集群。
还有其他方法,例如在 Airflow 中分支,但这将有以下问题 -
代码冗余
为第一次和后续重试分别配置多个 dataproc。使用 PythonOperator 和 CustomOperator,可以以编程方式更新 Config。
如果在第一次任务运行中创建失败,是否有其他方法可以创建具有不同配置的 dataproc 集群。