我在气流中遇到了一个烦人的问题,它不断在 UI 中排队很多任务,为了让它们保持运行,我必须重新启动调度程序和工作人员。我的 Airflow 配置使用 CeleryExecutor,在 Reddis 的帮助下运行在 2 个工作人员中。
我查看了工人的日志,它向我展示了这一点:
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: airflow.exceptions.AirflowException: dag_id could not be found: dc2_phd_nw_5225_processing. Either the dag did not exist or it failed to parse.
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: [2018-02-05 06:53:37,385: ERROR/ForkPoolWorker-17] Command 'airflow run dc2_phd_nw_5225_processing phd_5225_stage_4_add_new_gcs_segments_to_etl_unload_C 2018-02-04T02:00:00 --local --pool dc2 -sd /home/airflow/airflow/dags/doubleclick/dc2_processing.py' returned non-zero exit status 1
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: [2018-02-05 06:53:37,388: ERROR/ForkPoolWorker-17] Task airflow.executors.celery_executor.execute_command[a1821a3b-5ca5-430f-84ce-eb0625a7bbca] raised unexpected: AirflowException('Celery command failed',)
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: Traceback (most recent call last):
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: File "/usr/local/lib/python3.5/dist-packages/airflow/executors/celery_executor.py", line 56, in execute_command
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: subprocess.check_call(command, shell=True)
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: File "/usr/lib/python3.5/subprocess.py", line 581, in check_call
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: raise CalledProcessError(retcode, cmd)
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: subprocess.CalledProcessError: Command 'airflow run dc2_phd_nw_5225_processing phd_5225_stage_4_add_new_gcs_segments_to_etl_unload_C 2018-02-04T02:00:00 --local --pool dc2 -sd /home/airflow/airflow/dags/doubleclick/dc2_processing.py' returned non-zero exit status 1
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: During handling of the above exception, another exception occurred:
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: Traceback (most recent call last):
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: File "/usr/local/lib/python3.5/dist-packages/celery/app/trace.py", line 367, in trace_task
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: R = retval = fun(*args, **kwargs)
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: File "/usr/local/lib/python3.5/dist-packages/celery/app/trace.py", line 622, in __protected_call__
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: return self.run(*args, **kwargs)
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: File "/usr/local/lib/python3.5/dist-packages/airflow/executors/celery_executor.py", line 59, in execute_command
Feb 05 06:53:37 ip-172-31-46-75 airflow[3656]: raise AirflowException('Celery command failed')
我遵循了这个解决方案,指示在气流运行命令之后使用--raw来查看真正的异常,它说以下内容:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 403, in run
print("Logging into: " + filename)
UnboundLocalError: local variable 'filename' referenced before assignment
有没有人有同样的问题或知道如何解决它?