-1

我们在 Kubernetes 上运行气流。下面是我的airflowlocalsettings

airflowLocalSettings: |
      from airflow.contrib.kubernetes.pod import Pod, Resources
      from airflow.configuration import conf
      def pod_mutation_hook(pod: Pod):
          pod.labels.update({"app": "airflow-pod"})
          pod.annotations.update({"iam.amazonaws.com/role": "role"})
          pod.tolerations += [{"key": "spotInstance", "operator": "Exists"}]
          pod.resources = Resources(limit_memory = "512Mi", limit_cpu = "300m")
          pod.affinity.update({
            "nodeAffinity": {
              "preferredDuringSchedulingIgnoredDuringExecution": [
                {"weight": 100, "preference": {
                  "matchExpressions": [{
                    "key": "role.node.kubernetes.io/spot-worker",
                    "operator": "In",
                    "values": ["spot-worker"]
                  }]
                }}
              ]
            }
          })

我想启动一些选择性任务 pod,它们有自己的资源通过resources参数传递KubernetesPodOperator。现在的问题是 Resources 被覆盖,它从pod_mutation_hook.

我们如何绕过 pod_mutation 资源,让这些 pod 拥有自己的资源设置?我无法从中删除资源设置,pod_mutation_hook因为它也被其他 pod 使用。

4

1 回答 1

0

我通过调整 pod_mutaion_hook 代码解决了这个问题,如下所示。

resources = pod.resources
is_class = isinstance(resources, Resources)
if is_class:
    resource_flag = resources.is_empty_resource_request()
else:
    resource_flag = False
if resource_flag:
    pod.resources = Resources(limit_memory="512Mi", limit_cpu="300m")

基本上我正在检查资源是否已经在 pod 中可用,然后采取行动。

于 2021-06-06T10:25:56.030 回答