0

我已经开始与 prefect 合作,我正在尝试将我的结果保存到 Google 云存储:

import prefect
from prefect.engine.results import GCSResult
from prefect.run_configs import DockerRun, LocalRun
from prefect.storage import Docker, Local

@prefect.task(checkpoint=True, result=GCSResult(bucket="redacted"))
def task1():
    return 1


storage = Local(...)
run_config = LocalRun()

with prefect.Flow(
    "myflow", 
    storage=storage, 
    run_config=run_config
) as flow:
    results = task1()

flow.run()

如果我将 GOOGLE_APPLICATION_CREDENTIALS 环境变量设置为键,一切正常。

但是,在尝试对流程进行 docker 化时,我遇到了一些困难:

storage = Docker(...)
run_config = DockerRun(dockerfile="DockerFile")

with prefect.Flow(
    "myflow", 
    storage=storage, 
    run_config=run_config
) as flow:
    ... # Same definition as previously

flow.register()

在这种情况下,当尝试使用 docker 代理运行我的流程时(无论是在注册流程的同一台机器上,我都会收到此错误):

google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials.
Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. 
For more information, please see https://cloud.google.com/docs/authentication/getting-started

按照文档,我试图GCP_CREDENTIALS在我的 Prefect 云上设置一个秘密。但无济于事,我仍然遇到同样的错误。

我也尝试将结果保存在一个单独的GCSUpload任务中,但我仍然遇到同样的错误。

我看到的一种解决方案是通过 DockerFile 在我的 docker 映像中打包凭据,但是我觉得这应该是我应该使用 Prefect 机密的用例。

4

1 回答 1

1

我已经制定了一些使用PrefectSecret任务检索凭据的方法。

我必须创建一个额外的GCSUpload任务,将结果task1直接保存在 GCS 中。

我的最终代码如下所示:


import prefect
from prefect.tasks.gcp.storage import GCSUpload
from prefect.tasks.secrets import PrefectSecret
from prefect.run_configs import DockerRun
from prefect.storage import Docker

retrieve_gcp_credentials = PrefectSecret("GCP_CREDENTIALS")


@prefect.task(checkpoint=True, result=GCSResult(bucket="redacted"))
def task1():
    return "1"

save_results_to_gcp = GCSUpload(bucket="redacted")

storage = Docker()
run_config = DockerRun()

with prefect.Flow(
    "myflow", 
    storage=storage, 
    run_config=run_config
) as flow:
    credentials = retrieve_gcp_credentials()
    results = task1()
    save_results_to_gcp(results, credentials=credentials)

flow.run()

(请注意,我还必须更改返回的值类型task1,因为任务只能上传字符串或字节)

这对我的用例来说已经足够了(只需将结果保存在 GCS 中),但如果有人知道如何使用GCSResult它,我会留下这个问题,因为它对缓存也很有用。

于 2021-01-19T15:54:08.430 回答