我有一个Flow
in prefect
which 和 atask
的输出是 a dataframe
。在下面提供的示例中,它总是失败。我希望task
返回一个空dataframe
的状态为SUCCESS
using @task(on_failure=handle_task_fail)
。实现此目的的正确语法是什么?
from pprint import pprint
import pandas as pd
from prefect import Flow, task
from prefect.engine.signals import SUCCESS
def handle_disambig_error(task, old_state, new_state):
if new_state.is_failed():
new_state.result["wiki_df"] = pd.DataFrame()
# Is this needed?
#set state to SUCCESS
return new_state
@task(on_failure=handle_disambig_error)
def get_wiki_resource():
wiki_df = pd.DataFrame(
{
"a":[1],
"b":[1/0]
}
)
return wiki_df
with Flow("Always Fail") as flow:
wiki_df = get_wiki_resource()
state = flow.run()
task_state = state.result[wiki_df]
pprint(task_state.result)
追溯:
Traceback (most recent call last):
File "/miniconda3/lib/python3.7/site-packages/prefect/engine/runner.py", line 161, in handle_state_change
new_state = self.call_runner_target_handlers(old_state, new_state)
File "/miniconda3/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 120, in call_runner_target_handlers
new_state = handler(self.task, old_state, new_state) or new_state
File "/miniconda3/lib/python3.7/site-packages/prefect/utilities/notifications.py", line 69, in state_handler
fn(obj, new_state)
TypeError: handle_disambig_error() missing 1 required positional argument: 'new_state'
[2020-01-28 17:39:41,759] INFO - prefect.TaskRunner | Task 'get_wiki_resource': finished task run for task with final state: 'Failed'
[2020-01-28 17:39:41,762] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
有些地方我搜索了状态处理程序,使用状态处理程序进行记录