11

我需要通过 ftp 下载文件,更改它并上传回来。我正在使用 celery 来执行此操作,但是在尝试使用链接时遇到了问题,我得到了:

TypeError:upload_ftp_image() 正好需要 5 个参数(给定 6 个)

另外,我可以使用链并确保这些步骤是连续的吗?如果不是,还有什么选择?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
print res.get()

任务:

@task()
def download_ftp_image(ftp_server, username , password , filename, directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        if not os.path.exists(directory):
            os.makedirs(directory)
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        else:
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        ftp.quit()
    except error_perm, resp:
        raise download_ftp_image.retry(countdown=15)

    return "SUCCESS: "  

@task()
def upload_ftp_image(ftp_server, username , password , file , directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        new_file= file.replace(directory, "")
        directory = directory.replace("tmp","")
        try:
            ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
        except:
            ftp.mkd(directory)
            ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
        ftp.quit()
    except error_perm, resp:
        raise upload_ftp_image.retry(countdown=15)

    return "SUCCESS: "

对于我的具体情况,这是好的还是坏的做法?:

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
result.get()
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
#result.get()
4

2 回答 2

25

如果您不希望将前一个任务的返回值用作参数,另一种选择是使用“不变性”。

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

而不是将您的子任务定义为:

download_ftp_image.s(...) and upload_ftp_image.s(...)

将它们定义为:

download_ftp_image.si(...) and upload_ftp_image.si(...)

现在,您可以在链中使用具有通常数量参数的任务。

于 2013-03-06T08:55:01.433 回答
17

链总是将先前的结果作为第一个参数传递。从链文档

链接的任务将应用其父任务的结果作为第一个参数,在上述情况下将导致mul(4, 16)结果为 4。

您的upload_ftp_image任务不接受这个额外的参数,因此它失败了。

你有一个很好的链接用例;保证在第一个任务完成调用第二个任务(否则无论如何都无法传递结果)。

只需为上一个任务的结果添加一个参数:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory):

您可以利用该结果值;也许使它成为下载方法返回下载文件的路径,以便上传方法知道要上传什么?

于 2013-03-05T13:03:03.373 回答