1

我正在尝试创建一个接收 PyMySQL 连接实例作为输入的 Prefect 任务,例如:

@task
def connect_db():
    connection = pymysql.connect(user=user,
                                 password=password,
                                 host=host,
                                 port=port,
                                 db=db,
                                 connect_timeout=5,
                                 cursorclass=pymysql.cursors.DictCursor,
                                 local_infile=True)
    return connection


@task
def query_db(connection) -> Any:
    query = 'SELECT * FROM myschema.mytable;'
    with connection.cursor() as cur:
        cur.execute(query)
        rows = cur.fetchall()
    return rows


@task
def get_df(rows) -> Any:
    return pd.DataFrame(rows, dtype=str)


@task
def save_csv(df):
    path = 'mypath'
    df.to_csv(path, sep=';', index=False)


with Flow(FLOW_NAME) as f:
    con = connect_db()
    rows = query_db(con)
    df = get_df(rows)
    save_csv(df)

但是,当我尝试注册结果流时,它会引发“TypeError: cannot pickle 'socket' object”。通过 Prefect 的文档,我发现了内置的 MySQL 任务(https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute),但每次调用时它们都会打开和关闭连接. 有没有办法将以前打开的连接传递给 Prefect Task(或实现连接管理器之类的东西)?

4

1 回答 1

3

我试图复制你的例子,但它注册得很好。出现此类错误的最常见方式是,如果您在流程使用的全局命名空间中有客户端。Prefect 将在注册时尝试对其进行序列化。例如,如果您尝试注册以下代码片段,则会出错:

import pymysql
connection = pymysql.connect(user=user,
                             password=password,
                             host=host,
                             port=port,
                             db=db,
                             connect_timeout=5,
                             cursorclass=pymysql.cursors.DictCursor,
                             local_infile=True)

@task
def query_db(connection) -> Any:
    query = 'SELECT * FROM myschema.mytable;'
    with connection.cursor() as cur:
        cur.execute(query)
        rows = cur.fetchall()
    return rows

with Flow(FLOW_NAME) as f:
    rows = query_db(connection)

此错误是因为connection变量与流对象一起序列化。您可以通过将 Flow 存储为脚本来解决此问题。有关更多信息,请参阅此链接:

https://docs.prefect.io/core/idioms/script-based.html#using-script-based-flow-storage

这将避免 Flow 对象的序列化并在运行时创建该连接。

如果在运行时发生这种情况

如果您在运行时遇到此错误,您可以看到这有两个可能的原因。第一个是 Dask 序列化它,第二个来自 Prefect 检查点。

Dask 用于cloudpickle通过网络将数据发送给工作人员。因此,如果您将 Prefect 与 DaskExecutor 一起使用,它将cloudpickle用于发送任务以供执行。因此,任务输入和输出需要可序列化。在这种情况下,您应该实例化客户端并在任务中执行查询(就像您在当前 MySQL 任务实现中看到的那样)

如果您使用 LocalExecutor,则默认情况下会序列化任务输出,因为默认情况下会启用检查点。checkpoint=False您可以在定义任务时通过做来切换。

如果您需要进一步的帮助,请随时在 prefect.io/slack 加入 Prefect Slack 频道。

于 2021-11-11T20:35:08.840 回答