这是我的目录结构:
app
|-- automate-scan.py
|-- helpers
| |-- __init__.py
| |-- __init__.pyc
| |-- tasks.py
| `-- tasks.pyc
|-- __init__.py
`-- __init__.pyc
现在这是我要运行的内容:
atuomate_scan.py
import os
from configobj import ConfigObj
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://user:pass@localhost/my_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('initialScanning'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
config_file = '/home/myuser/config.cfg'
config_obj = ConfigObj(config_file)
repo_list_file = config_obj.get('files').get('repo_file')
@celapp.task()
def run_scan(repo):
print repo
os.system("git clone " + repo)
repo_to_scan = repo.split('/')[1].split('.')[0]
os.system("python /home/someuser/myscript.py file:///home/someuser/repo_collection/" + repo_to_scan + " --json --regex")
@celapp.task()
def final_task():
print "hakuna matata"
存储库- 在下面的代码中是一个字符串列表 tasks.py
import os
from configobj import ConfigObj
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
from celery import chord
from helpers import tasks
f = open('/home/someuser/repositories.lst')
os.system("mkdir /home/someuser/repo_collection")
repositories = f.read().splitlines()
f.close()
print repositories
print "\n\n"
created_task = (tasks.run_scan.subtask((repo)) for (repo) in repositories)
finaltask = tasks.final_task.subtask()
res = chord(created_task,queue='initialScanning')(finaltask)
当我运行时,automate-scan.py
我收到以下错误:
Traceback (most recent call last):
File "app/automate-scan.py", line 20, in <module>
res = chord(created_task,queue='initialScanning')(finaltask)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 1189, in __call__
return self.apply_async((), {'body': body} if body else {}, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 1232, in apply_async
return self.run(tasks, body, args, task_id=task_id, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 1277, in run
header_result = header(*partial_args, task_id=group_id, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 953, in __call__
return self.apply_async(partial_args, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 978, in apply_async
args=args, kwargs=kwargs, **options))
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 1054, in _apply_tasks
**options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: run_scan() takes exactly 1 argument (50 given)
我不明白我在哪里给出 50 个论点。请让我知道我哪里出错了,需要改变什么来解决这个问题。
编辑1 另外,我尝试更改行:
created_task = (tasks.run_scan.subtask((repo)) for (repo) in repositories)
至
created_task = (tasks.run_scan.subtask(repo) for repo in repositories)
也是。但同样的结果