我正在尝试flow
使用存储在prefect cloud
其中的秘密来运行coiled
.
错误出flow
有prefect client error
:
prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
这是身份验证错误吗?我需要将 a 传递prefect cloud authentication token
给coiled
集群吗?或添加配置以coiled
使用这些选项之一
以下是我使用的代码和配置:
~/.prefect/config.toml
[cloud]
use_local_secrets = false
[flows]
checkpointing = true
[cloud.agent]
# Runner token
auth_token = "<auth_token>"
Flow
import prefect
from prefect import Flow, Parameter, task
from prefect.tasks.secrets import PrefectSecret
@task(log_stdout=True)
def add_last_name(first_name, last_name="Smith", pwd=None):
if pwd=="hello":
last_name="Hamilton"
full_name = f"{first_name} {last_name}"
print(full_name)
return full_name
with Flow(name="tst-deploy") as flow:
first_name = Parameter("first_name", default="Alexander")
# Get secret
unlock_nm_secret = PrefectSecret("fake_pwd")
# test prefect secret
full_name = add_last_name(first_name, pwd=unlock_nm_secret)
if __name__ == "__main__":
import coiled
run_se = 1
if run_se == 1:
coiled.create_software_environment(
name="tst-prefect-py38",
conda={"channels": ["conda-forge", "defaults"],
"dependencies": ["python=3.8.8", "numpy", "prefect"]},
post_build=["python -m pip install jupyter-server-proxy"]
)
from prefect.executors import DaskExecutor, LocalExecutor
e = 3
if e==1:
print("Local")
executor = LocalExecutor()
elif e==2:
print("Local Dask Executor")
executor = DaskExecutor()
elif e==3:
print("Coiled")
executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"n_workers":1,
"worker_cpu":1,
"worker_memory":"8 GiB",
"scheduler_memory": "8 GiB",
"software": "grybox/tst-prefect-py38",
"name": "tst-py38",
"shutdown_on_close": False,
}
)
state = flow.run(
executor= executor
)
flow.visualize(flow_state=state)
追溯
[2021-03-16 19:04:44+0000] INFO - prefect.TaskRunner | Task 'fake_pwd': Starting task run...
[2021-03-16 19:04:44+0000] ERROR - prefect.TaskRunner | Unexpected error: ClientError('Malformed response received from Cloud - please ensure that you have an API token properly configured.')
Traceback (most recent call last):
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/client.py", line 465, in _request
json_resp = response.json()
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/conda/envs/coiled/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/opt/conda/envs/coiled/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/conda/envs/coiled/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/tasks/secrets/base.py", line 68, in run
return _Secret(name).get()
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/secrets.py", line 161, in get
raise exc
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/secrets.py", line 145, in get
result = self.client.graphql(
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/client.py", line 298, in graphql
result = self.post(
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/client.py", line 213, in post
response = self._request(
File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/client.py", line 468, in _request
raise ClientError(
prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
[2021-03-16 19:04:44+0000] INFO - prefect.TaskRunner | Task 'fake_pwd': Finished task run for task with final state: 'Failed'
先感谢您。