我正在尝试创建一个接收 PyMySQL 连接实例作为输入的 Prefect 任务,例如:
@task
def connect_db():
connection = pymysql.connect(user=user,
password=password,
host=host,
port=port,
db=db,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor,
local_infile=True)
return connection
@task
def query_db(connection) -> Any:
query = 'SELECT * FROM myschema.mytable;'
with connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
return rows
@task
def get_df(rows) -> Any:
return pd.DataFrame(rows, dtype=str)
@task
def save_csv(df):
path = 'mypath'
df.to_csv(path, sep=';', index=False)
with Flow(FLOW_NAME) as f:
con = connect_db()
rows = query_db(con)
df = get_df(rows)
save_csv(df)
但是,当我尝试注册结果流时,它会引发“TypeError: cannot pickle 'socket' object”。通过 Prefect 的文档,我发现了内置的 MySQL 任务(https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute),但每次调用时它们都会打开和关闭连接. 有没有办法将以前打开的连接传递给 Prefect Task(或实现连接管理器之类的东西)?