7

与 celery 相关的气流 cfg 设置为:

broker_url = 'amqp://guest:guest@rabbitmq_server:8080'
celery_result_backend = db+postgresql://developer:password@postgres_server:5432/db_name

airflow webserver运行正常,但是在从气流 UI 运行任务时出现错误 。从 ui 运行任务时出错

我在运行气流调度程序时出错,tracecak 是:

 Traceback (most recent call last):
 File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
 File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 755, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 125, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 172, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 952, in run
executor.heartbeat()
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/base_executor.py", line 124, in heartbeat
self.execute_async(key, command=command, queue=queue)
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 80, in execute_async
args=[command], queue=queue)
File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 536, in apply_async
**options
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 734, in send_task
with self.producer_or_acquire(producer) as P:
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 863, in producer_or_acquire
producer, self.producer_pool.acquire, block=True,
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 1233, in producer_pool
return self.amqp.producer_pool
File "/usr/local/lib/python2.7/dist-packages/celery/app/amqp.py", line 614, in producer_pool
self.app.connection_for_write()]
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 760, in connection_for_write
return self._connection(url or self.conf.broker_write_url, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 828, in _connection
'broker_connection_timeout', connect_timeout
File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 179, in __init__
if not get_transport_cls(transport).can_parse_url:
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 83, in get_transport_cls
_transport_cache[transport] = resolve_transport(transport)
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 64, in resolve_transport
raise KeyError('No such transport: {0}'.format(transport))
KeyError: u'No such transport: '

我的模块版本是:

  1. 气流==1.8
  2. 芹菜==4.1.0
  3. 海带==4.1.0
  4. 蟒蛇==2.7.12
4

1 回答 1

15

我在这个问题上浪费了很多时间,这个错误的原因是引号broker_url = 'amqp://guest:guest@rabbitmq_server:8080'只是删除引号:broker_url = amqp://guest:guest@rabbitmq_server:8080解决了这个问题。

于 2017-08-02T13:21:30.230 回答