12

我已经开始在各种项目中使用 Prefect,现在我需要决定在 GCP 上哪种部署策略最有效。最好我想无服务器工作。比较 Cloud Run、Cloud Functions 和 App Engine,我倾向于选择后者,因为它没有超时限制,而其他两个分别有 9 个。15分钟。

我很想听听人们如何无服务器地部署 Prefect 流,这样流被调度/触发以进行批处理,而代理在不使用时会自动缩小。

或者,更经典的方法是在Compute Engine 上部署 Prefect 并通过 Cloud Scheduler 进行调度。但我觉得这有点过时了,对 Prefect 的功能和未来开发的灵活性不公平。

4

3 回答 3

9

我很想听听人们如何无服务器地部署 Prefect 流,这样流被调度/触发以进行批处理,而代理在不使用时会自动缩小。

Prefect 有一篇关于使用 AWS Lambda 进行无服务器部署的博客文章,这是使用 GCP 进行相同操作的一个很好的蓝图。这里的挑战是代理扩展 - 代理通过定期轮询后端(无论是Prefect Server的自我部署还是托管的Prefect Cloud )来工作(每约 10 秒)。想到的一种可能性是使用云函数在进程中启动代理,由您正在考虑的任何批处理/调度事件触发。您还可以使用-max-pollsCLI 参数或 kwarg 来启动代理以查找运行;如果在您指定的多次轮询尝试之后没有找到任何东西,它会自行拆除。此处或任何特定代理页面上的详细信息。

但是,这对于长期运行的流程可能效率低下,并且您可能会达到资源上限;如果工作负载足够高,可能值得考虑触发自动扩展Dask 集群部署。Prefect 原生支持 Kubernetes,并且有一个Kubernetes 代理来与你的集群交互。我认为这将是最优雅和可扩展的解决方案,而不必走经典的 Compute Engine 路线,我同意这有点过时并且不提供出色的自动扩展或一流的管理。

路线图上有对无服务器执行的更好支持,特别是无服务器代理正在开发中,但我没有关于何时发布的 ETA。

希望这会有所帮助!:)

于 2020-04-27T18:27:05.967 回答
2

最近添加到 Prefect 的是Vertex Agent,它使用 AIP 的继承者 GCP Vertex。Vertex 具有高度可配置的无服务器执行环境,并且没有超时。

于 2021-10-28T16:29:11.153 回答
1

完整的解释在这里:https ://jerryan.medium.com/hacking-ways-to-run-prefect-flow-serverless-in-google-cloud-function-bc6b249126e4 。

基本上,有两种黑客方法可以解决这个问题。

  • 使用谷歌云存储自动持久化任务状态
  • 将云函数的先前执行结果发布到其后续执行。

缓存和持久化数据

默认情况下,Prefect Core 将所有数据、结果和缓存状态存储在运行流程的 Python 进程的内存中。但是,如果配置了必要的钩子,它们可以被持久化并从外部位置检索。

Prefect 有一个“检查点”的概念,可确保每次成功运行任务时,其返回值都会根据结果对象和任务目标中的配置写入持久存储。

@task(result=LocalResult(dir="~/.prefect"), target="task.txt") 
def func_task():
    return 99

完整的代码示例如下所示。在这里,我们使用GCSResult.

import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
from prefect import task, Flow  
from prefect.engine.results import LocalResult, GCSResult

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task1():
    print("Task 1")
    return "Task 1"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task2():
    print("Task 2")
    return "Task 2"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task3():
    print("Task 3")
    return "Task 3"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task4():
    print("Task 4")

@task
def task5():
    print("Task 5")

@task
def task6():
    print("Task 6")

@task
def task7():
    print("Task 7")

@task
def task8():
    print("Task 8")
    

# with Flow("This is My First Flow",result=LocalResult(dir="~/prefect")) as flow:
with Flow("this is my first flow", result=GCSResult(bucket="prefect")) as flow:
    t1, t2 = task1(), task2()
    t3 = task3(upstream_tasks=[t1,t2])
    t4 = task4(upstream_tasks=[t3])
    t5 = task5(upstream_tasks=[t4])
    t6 = task6(upstream_tasks=[t4])
    t7 = task7(upstream_tasks=[t2,t6])
    t8 = task8(upstream_tasks=[t2,t3])

# run the whole flow
flow_state = flow.run()

# visualize the flow
flow.visualize(flow_state)

# print the state of the flow
print(flow_state.result)

发布执行结果

另一个黑客解决方案是将谷歌云功能的先前执行结果发布到其后续执行。在这里,我们假设任务之间没有数据输入和输出依赖。

需要进行一些修改才能实现。

  • 更改任务的自定义状态处理程序
  • 发布前手动更改任务状态
  • 编码/解码任务状态

首先,我们知道 flow.run 函数在所有任务进入完成状态后完成,是成功还是失败。但是,我们不希望所有任务都在一次调用 google cloud 函数中运行,因为总运行时间可能超过 540 秒。

因此使用了任务的自定义状态处理程序。每次任务完成时,我们都会向完美框架发出 ENDRUN 信号。然后它将剩余任务的状态设置为已取消。

from prefect import task, Flow, Task
from prefect.engine.runner import ENDRUN
from prefect.engine.state import State, Cancelled

num_finished = 0

def my_state_handler(obj, old_state, new_state):
    global num_finished
    if num_finished >= 1:
        raise ENDRUN(state=Cancelled("Flow run is cancelled"))

    if new_state.is_finished():  
        num_finished += 1
    return new_state

其次,为了使状态为取消的任务下一次正确执行,我们必须手动将其状态更改为待处理。

def run(task_state_dict: Dict[Task, State]) -> Dict[Task, State]:

flow_state = flow.run(task_states=task_state_dict)
task_states = flow_state.result

# change task state before next publish
for t in task_states:
    if isinstance(task_states[t], Cancelled):
        task_states[t] = Pending("Mocked pending")
        

# TODO: reset global counter
global num_finished
num_finished = 0

# task state for next run
return task_states

第三,有两个基本函数:encoding_data 和 decode_data。前者将任务状态序列化以准备发布,后者将任务状态反序列化为流对象。

# encoding: utf-8
from typing import List, Dict, Any
from prefect.engine.state import State
from prefect import Flow, Task


def decode_data(flow: Flow, data: List[Dict[str, Any]]) -> Dict[Task, State]:
    # data as follows:
    # [
    #     {
    #         "task": {
    #               "slug": "task1"
    #          }
    #         "state": {
    #             "type": "Success",
    #             "message": "Task run succeeded(manually set)"
    #         }
    #     }
    # ]

    task_states = {}
    for d in data:

        tasks_found = flow.get_tasks(d['task']['slug'])
        if len(tasks_found) != 1:  # 不唯一就不做处理了
            continue

        state = State.deserialize(
            {"message": d['state']['message'],
             "type": d['state']['type']
             }
        )
        task_states[tasks_found[0]] = state

    return task_states


def encode_data(task_states: Dict[Task, State]) -> List[Dict[str, Any]]:
    data = []
    for task, state in task_states.items():
        data.append({
            "task": task.serialize(),
            "state": state.serialize()
        })
    return data

最后但并非最不重要的一点是,编排连接了上述所有部分。def main(data: List[Dict[str, Any]], *args, **kargs) -> List[Dict[str, Any]]: task_states = decode_data(flow, data) task_states = run(task_states) return encode_data (任务状态)

if __name__ == "__main__":
    evt = []

    while True:
        data = main(evt)

        states = defaultdict(set)
        for task in data:
            task_type, slug = task['state']['type'], task['task']['slug']
            states[task_type].add(slug)
        if len(states['Pending']) == 0:
            sys.exit(0)

        evt = data
        
        # send pubsub message here
        # GooglePubsub().publish(evt)
        # sys.exit(0)
于 2021-12-17T15:09:27.890 回答