1

我已经构建了一个具有默认设置的 Airflow DAG 作为教程。当我运行 DAG 时,一些任务失败了。当我尝试使用 清除失败的任务airflow clear my_dag -s 2016-08-03 -t my_task_name -fd时,出现以下异常:

TypeError: object.__new__(thread.lock) is not safe, use thread.lock.__new__()

这是气流本身的问题,还是我这边的问题?

完整的回溯是:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 379, in clear
    include_upstream=args.upstream,
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2870, in sub_dag
    dag = copy.deepcopy(self)
  File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
    y = copier(memo)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2856, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
    y = copier(memo)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1974, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
    y = copier(memo)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2856, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
    y = copier(memo)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1974, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 329, in _reconstruct
    y = callable(*args)
  File "/usr/lib/python2.7/copy_reg.py", line 93, in __newobj__
    return cls.__new__(cls, *args)
TypeError: object.__new__(thread.lock) is not safe, use thread.lock.__new__()
4

3 回答 3

0

看起来这是 Airflow 的一个未解决问题:https ://issues.apache.org/jira/browse/AIRFLOW-351

于 2016-08-15T23:45:36.547 回答
0

你能分享你的 my_dag 代码吗?如果您将复杂的对象传递给您的操作员,您将遇到 deepcopy 的问题。

请参阅下面的一个示例。这里 config_file 的参数是一个文件,它是复杂类型,在这种情况下,会出现 deepcopy 问题。您可以将所有参数作为单独的属性传递,而不是传递此参数。或者您可以在运算符本身中设置此属性文件变量,而不是作为参数传递。

job_flow = MySpecialOperator(
    task_id='create_ec2instance',
    aws_conn_id='service_aws',
    config_file='dev_ec2instance_parameters.cfg',
    initializer_task_id='initializer',
    dag=dag
)
于 2019-07-21T15:53:05.250 回答
0

不要使用 deepcopy(它几乎肯定是错误的)。deepcopy 将复制所有引用的对象;在这种情况下,您的对象图有一些锁;它们不能被 deepcopy 复制,因此您会收到错误消息。

同样的问题

于 2016-08-15T21:36:35.427 回答