尝试基于 KubernetesPodOperator 制作自己的组件。我能够定义组件并将其添加到组件列表中,但是在尝试运行它时,我得到:
节点“KubernetesPodOperator”的操作员“KubernetesPodOperator”未在可用操作员列表中配置。请将“KubernetesPodOperator”的完全限定包名称添加到 AirflowPipelineProcessor.available_airflow_operators 配置中。
和错误:
Traceback (most recent call last):
File "/opt/conda/lib/python3.9/site-packages/tornado/web.py", line 1704, in _execute
result = await result
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/handlers.py", line 120, in post
response = await PipelineProcessorManager.instance().process(pipeline)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/processor.py", line 134, in process
res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
File "/opt/conda/lib/python3.9/asyncio/futures.py", line 284, in __await__
yield self # This tells Task to wait for completion.
File "/opt/conda/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
future.result()
File "/opt/conda/lib/python3.9/asyncio/futures.py", line 201, in result
raise self._exception
File "/opt/conda/lib/python3.9/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 122, in process
pipeline_filepath = self.create_pipeline_file(pipeline=pipeline,
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 420, in create_pipeline_file
target_ops = self._cc_pipeline(pipeline, pipeline_name)
File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 368, in _cc_pipeline
raise ValueError(f"Operator '{component.name}' of node '{operation.name}' is not configured "
ValueError: Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators. Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.
查看 src 代码后,我可以在processor_airflow.py中看到这些行:
# This specifies the default airflow operators included with Elyra. Any Airflow-based
# custom connectors should create/extend the elyra configuration file to include
# those fully-qualified operator/class names.
available_airflow_operators = ListTrait(
CUnicode(),
["airflow.operators.slack_operator.SlackAPIPostOperator",
"airflow.operators.bash_operator.BashOperator",
"airflow.operators.email_operator.EmailOperator",
"airflow.operators.http_operator.SimpleHttpOperator",
"airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
"airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator"],
help="""List of available Apache Airflow operator names.
Operators available for use within Apache Airflow pipelines. These operators must
be fully qualified (i.e., prefixed with their package names).
""",
).tag(config=True)
我不确定这是否可以从客户端扩展。