0

如何在 kubernetes 环境中对服务中的气流运行 kubernetes spark 运算符?感谢您让我知道问题所在


kubernetes 集群连接信息

*** Log file does not exist: /opt/airflow/logs/spark_pi/spark_pi_submit/2022-03-03T05:44:20.891535+00:00/16.log
*** Fetching from: http://airflow-worker-0.airflow-worker.airflow.svc.cluster.local:8793/log/spark_pi/spark_pi_submit/2022-03-03T05:44:20.891535+00:00/16.log

[2022-03-04, 08:03:25 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: spark_pi.spark_pi_submit scheduled__2022-03-03T05:44:20.891535+00:00 [queued]>
[2022-03-04, 08:03:25 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: spark_pi.spark_pi_submit scheduled__2022-03-03T05:44:20.891535+00:00 [queued]>
[2022-03-04, 08:03:25 UTC] {taskinstance.py:1238} INFO - 
--------------------------------------------------------------------------------
[2022-03-04, 08:03:25 UTC] {taskinstance.py:1239} INFO - Starting attempt 16 of 16
[2022-03-04, 08:03:25 UTC] {taskinstance.py:1240} INFO - 
--------------------------------------------------------------------------------
[2022-03-04, 08:03:25 UTC] {taskinstance.py:1259} INFO - Executing <Task(SparkKubernetesOperator): spark_pi_submit> on 2022-03-03 05:44:20.891535+00:00
[2022-03-04, 08:03:25 UTC] {standard_task_runner.py:52} INFO - Started process 81 to run task
[2022-03-04, 08:03:25 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'spark_pi', 'spark_pi_submit', 'scheduled__2022-03-03T05:44:20.891535+00:00', '--job-id', '30', '--raw', '--subdir', 'DAGS_FOLDER/dags/sparkTest.py', '--cfg-path', '/tmp/tmpfb3gy00l', '--error-file', '/tmp/tmp_l18y_z6']
[2022-03-04, 08:03:25 UTC] {standard_task_runner.py:77} INFO - Job 30: Subtask spark_pi_submit
[2022-03-04, 08:03:25 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: spark_pi.spark_pi_submit scheduled__2022-03-03T05:44:20.891535+00:00 [running]> on host airflow-worker-0.airflow-worker.airflow.svc.cluster.local
[2022-03-04, 08:03:25 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=spark_pi
AIRFLOW_CTX_TASK_ID=spark_pi_submit
AIRFLOW_CTX_EXECUTION_DATE=2022-03-03T05:44:20.891535+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-03-03T05:44:20.891535+00:00
[2022-03-04, 08:03:25 UTC] {spark_kubernetes.py:68} INFO - Creating sparkApplication
[2022-03-04, 08:03:25 UTC] {taskinstance.py:1700} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 175, in create_custom_object
    group=group, version=version, namespace=namespace, plural=plural, body=body
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api/custom_objects_api.py", line 183, in create_namespaced_custom_object
    (data) = self.create_namespaced_custom_object_with_http_info(group, version, namespace, plural, body, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api/custom_objects_api.py", line 289, in create_namespaced_custom_object_with_http_info
    collection_formats=collection_formats)
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 345, in call_api
    _preload_content, _request_timeout)
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 176, in __call_api
    _request_timeout=_request_timeout)
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 388, in request
    body=body)
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 278, in POST
    body=body)
  File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 231, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'Audit-Id': '17721f7b-9933-415b-aeef-071c4fef5e53', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '0754a757-53b5-4f73-b4f1-e0b93022c0b6', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'a1e5f894-22f7-4a73-93fd-fc61b7cf9790', 'Date': 'Fri, 04 Mar 2022 08:03:25 GMT', 'Content-Length': '313'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Internal error occurred: Object 'apiVersion' is missing in 'object has no apiVersion field'","reason":"InternalError","details":{"causes":[{"message":"Object 'apiVersion' is missing in 'object has no apiVersion field'"}]},"code":500}


这是包含调用 kubernetes spark operator 的气流 Dag 文件。

from datetime import datetime, timedelta

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
import pathlib
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor

# [END import_module]


# [START instantiate_dag]

dag = DAG(
    'spark_pi',
    default_args={'max_active_runs': 1},
    description='submit spark-pi as sparkApplication on kubernetes',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
)

t1 = SparkKubernetesOperator(
    task_id='spark_pi_submit',
    namespace="default",
    application_file=pathlib.Path("/opt/airflow/dags/repo/yaml/sparkTest.yaml").read_text(),
    kubernetes_conn_id="kube_test",
    # in_cluster=True, 
    do_xcom_push=True,
    dag=dag,
)

t1 

当我运行气流 dag 时,它会回复一条消息,说找不到 kubernetes 的 apiversion。

4

0 回答 0