0

我正在尝试运行一个非常简单的测试 DAG 来掌握 GCP Cloud Composer 的基本功能,但是每次我触发 DAG 时,都会弹出一个令人讨厌的错误,我似乎找不到任何有关如何解决的信息它。

错误是:

2020-03-18 22:20:56,627] {taskinstance.py:1059} ERROR - __init__() got an unexpected keyword argument 'min'@-@{"workflow": "notebook-test", "task-id": "notebook-test", "execution-date": "2020-03-18T22:20:41.232043+00:00"}
Traceback (most recent call last):
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/gcs/dags/test.py", line 44, in execute_nb
    parameters=params
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/execute.py", line 104, in execute_notebook
    **engine_kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 49, in execute_notebook_with_engine
    return self.get_engine(engine_name).execute_notebook(nb, kernel_name, **kwargs)
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 341, in execute_notebook
    nb_man.notebook_start()
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 69, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 198, in notebook_start
    self.save()
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 69, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/engines.py", line 139, in save
    write_ipynb(self.nb, self.output_path)
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/iorw.py", line 397, in write_ipynb
    papermill_io.write(nbformat.writes(nb), path)
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/iorw.py", line 128, in write
    return self.get_handler(path).write(buf, path)
  File "/opt/python3.6/lib/python3.6/site-packages/papermill/iorw.py", line 316, in write
    multiplier=self.RETRY_MULTIPLIER, min=self.RETRY_DELAY, max=self.RETRY_MAX_DELAY
TypeError: __init__() got an unexpected keyword argument 'min'

我的 DAG 的代码是:

import airflow
import papermill as pm
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'end_date': None,
    'retries': 0,    
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    dag_id="notebook-test",
    description="a test",
    default_args=default_args,
    catchup=True,
    schedule_interval=None,
    dagrun_timeout=(timedelta(seconds=30))
)

NB_PATH = "gs://BUCKET/data/"

params = {}


def execute_nb():
    input_nb = NB_PATH + "test.ipynb"
    output_nb = NB_PATH + "test_ran.ipynb"

    pm.execute_notebook(
        input_nb,
        output_nb,
        parameters=params
    )


op = PythonOperator(
    task_id="notebook-test",
    python_callable=execute_nb,
    dag=dag
)

op

我已经从https://github.com/nteract/papermill/issues/445尝试过的一种解决方案是更新 Tenacity 的版本,但将其添加到我的 Cloud Composer 环境的 PyPi Packages 选项卡并没有解决任何问题。

任何帮助将不胜感激,谢谢!

编辑:图像版本是 composer-1.9.2-airflow.1.10.6。

4

1 回答 1

1

事实证明,问题与提供的路径有关。

我必须添加import os以及from pathlib import Path然后使我的变量

NB_PATH = str(Path(os.path.abspath(__file__)).parents[1]) + "/data".

这也需要我添加jupyter一个 PyPi 依赖项以使 papermill 正常工作,但它现在似乎可以工作了!

于 2020-03-20T20:29:46.023 回答