完整的解释在这里:https ://jerryan.medium.com/hacking-ways-to-run-prefect-flow-serverless-in-google-cloud-function-bc6b249126e4 。
基本上,有两种黑客方法可以解决这个问题。
- 使用谷歌云存储自动持久化任务状态
- 将云函数的先前执行结果发布到其后续执行。
缓存和持久化数据
默认情况下,Prefect Core 将所有数据、结果和缓存状态存储在运行流程的 Python 进程的内存中。但是,如果配置了必要的钩子,它们可以被持久化并从外部位置检索。
Prefect 有一个“检查点”的概念,可确保每次成功运行任务时,其返回值都会根据结果对象和任务目标中的配置写入持久存储。
@task(result=LocalResult(dir="~/.prefect"), target="task.txt")
def func_task():
return 99
完整的代码示例如下所示。在这里,我们使用GCSResult
.
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
from prefect import task, Flow
from prefect.engine.results import LocalResult, GCSResult
@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task1():
print("Task 1")
return "Task 1"
@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task2():
print("Task 2")
return "Task 2"
@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task3():
print("Task 3")
return "Task 3"
@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task4():
print("Task 4")
@task
def task5():
print("Task 5")
@task
def task6():
print("Task 6")
@task
def task7():
print("Task 7")
@task
def task8():
print("Task 8")
# with Flow("This is My First Flow",result=LocalResult(dir="~/prefect")) as flow:
with Flow("this is my first flow", result=GCSResult(bucket="prefect")) as flow:
t1, t2 = task1(), task2()
t3 = task3(upstream_tasks=[t1,t2])
t4 = task4(upstream_tasks=[t3])
t5 = task5(upstream_tasks=[t4])
t6 = task6(upstream_tasks=[t4])
t7 = task7(upstream_tasks=[t2,t6])
t8 = task8(upstream_tasks=[t2,t3])
# run the whole flow
flow_state = flow.run()
# visualize the flow
flow.visualize(flow_state)
# print the state of the flow
print(flow_state.result)
发布执行结果
另一个黑客解决方案是将谷歌云功能的先前执行结果发布到其后续执行。在这里,我们假设任务之间没有数据输入和输出依赖。
需要进行一些修改才能实现。
- 更改任务的自定义状态处理程序
- 发布前手动更改任务状态
- 编码/解码任务状态
首先,我们知道 flow.run 函数在所有任务进入完成状态后完成,是成功还是失败。但是,我们不希望所有任务都在一次调用 google cloud 函数中运行,因为总运行时间可能超过 540 秒。
因此使用了任务的自定义状态处理程序。每次任务完成时,我们都会向完美框架发出 ENDRUN 信号。然后它将剩余任务的状态设置为已取消。
from prefect import task, Flow, Task
from prefect.engine.runner import ENDRUN
from prefect.engine.state import State, Cancelled
num_finished = 0
def my_state_handler(obj, old_state, new_state):
global num_finished
if num_finished >= 1:
raise ENDRUN(state=Cancelled("Flow run is cancelled"))
if new_state.is_finished():
num_finished += 1
return new_state
其次,为了使状态为取消的任务下一次正确执行,我们必须手动将其状态更改为待处理。
def run(task_state_dict: Dict[Task, State]) -> Dict[Task, State]:
flow_state = flow.run(task_states=task_state_dict)
task_states = flow_state.result
# change task state before next publish
for t in task_states:
if isinstance(task_states[t], Cancelled):
task_states[t] = Pending("Mocked pending")
# TODO: reset global counter
global num_finished
num_finished = 0
# task state for next run
return task_states
第三,有两个基本函数:encoding_data 和 decode_data。前者将任务状态序列化以准备发布,后者将任务状态反序列化为流对象。
# encoding: utf-8
from typing import List, Dict, Any
from prefect.engine.state import State
from prefect import Flow, Task
def decode_data(flow: Flow, data: List[Dict[str, Any]]) -> Dict[Task, State]:
# data as follows:
# [
# {
# "task": {
# "slug": "task1"
# }
# "state": {
# "type": "Success",
# "message": "Task run succeeded(manually set)"
# }
# }
# ]
task_states = {}
for d in data:
tasks_found = flow.get_tasks(d['task']['slug'])
if len(tasks_found) != 1: # 不唯一就不做处理了
continue
state = State.deserialize(
{"message": d['state']['message'],
"type": d['state']['type']
}
)
task_states[tasks_found[0]] = state
return task_states
def encode_data(task_states: Dict[Task, State]) -> List[Dict[str, Any]]:
data = []
for task, state in task_states.items():
data.append({
"task": task.serialize(),
"state": state.serialize()
})
return data
最后但并非最不重要的一点是,编排连接了上述所有部分。def main(data: List[Dict[str, Any]], *args, **kargs) -> List[Dict[str, Any]]: task_states = decode_data(flow, data) task_states = run(task_states) return encode_data (任务状态)
if __name__ == "__main__":
evt = []
while True:
data = main(evt)
states = defaultdict(set)
for task in data:
task_type, slug = task['state']['type'], task['task']['slug']
states[task_type].add(slug)
if len(states['Pending']) == 0:
sys.exit(0)
evt = data
# send pubsub message here
# GooglePubsub().publish(evt)
# sys.exit(0)