2

尝试基于 KubernetesPodOperator 制作自己的组件。我能够定义组件并将其添加到组件列表中,但是在尝试运行它时,我得到:

节点“KubernetesPodOperator”的操作员“KubernetesPodOperator”未在可用操作员列表中配置。请将“KubernetesPodOperator”的完全限定包名称添加到 AirflowPipelineProcessor.available_airflow_operators 配置中。

和错误:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/tornado/web.py", line 1704, in _execute
    result = await result
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/handlers.py", line 120, in post
    response = await PipelineProcessorManager.instance().process(pipeline)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/processor.py", line 134, in process
    res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
  File "/opt/conda/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/opt/conda/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/opt/conda/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/opt/conda/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 122, in process
    pipeline_filepath = self.create_pipeline_file(pipeline=pipeline,
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 420, in create_pipeline_file
    target_ops = self._cc_pipeline(pipeline, pipeline_name)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 368, in _cc_pipeline
    raise ValueError(f"Operator '{component.name}' of node '{operation.name}' is not configured "
ValueError: Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators.  Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.

查看 src 代码后,我可以在processor_airflow.py中看到这些行:

 # This specifies the default airflow operators included with Elyra.  Any Airflow-based
    # custom connectors should create/extend the elyra configuration file to include
    # those fully-qualified operator/class names.
    available_airflow_operators = ListTrait(
        CUnicode(),
        ["airflow.operators.slack_operator.SlackAPIPostOperator",
         "airflow.operators.bash_operator.BashOperator",
         "airflow.operators.email_operator.EmailOperator",
         "airflow.operators.http_operator.SimpleHttpOperator",
         "airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
         "airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator"],
        help="""List of available Apache Airflow operator names.
Operators available for use within Apache Airflow pipelines.  These operators must
be fully qualified (i.e., prefixed with their package names).
       """,
    ).tag(config=True)

我不确定这是否可以从客户端扩展。

4

1 回答 1

2

available_airflow_operators 列表是 Elyra 中的可 配置特征 。您必须将完全限定的包名称添加 KubernetesPodOperator 到此列表中,才能正确创建 DAG。

为此,请从命令行使用 jupyter elyra --generate-config. 打开创建的文件并添加以下行( PipelineProcessor(LoggingConfigurable) 如果您希望保持文件井井有条,可以将其添加到标题下):

c.AirflowPipelineProcessor.available_airflow_operators.append("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator")

如果不是上述情况,请将该字符串值更改为您的用例的正确包(确保它以所需运算符的类名结尾)。如果需要添加多个包,也可以使用extend而不是append.

编辑:是相关文档的链接

于 2022-02-16T00:16:39.177 回答