如何使用 dask 并行运行具有不同列维度的 SQL 查询?以下是我的尝试:
from dask.delayed import delayed
from dask.diagnostics import ProgressBar
import dask
ProgressBar().register()
con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
@delayed
def loadsql(sql):
return pd.read_sql_query(sql,con)
results = [loadsql(x) for x in sql_to_run]
dask.compute(results)
df1=results[0]
df2=results[1]
df3=results[2]
df4=results[3]
df5=results[4]
df6=results[5]
但是,这会导致引发以下错误:
DatabaseError: sql 执行失败: "SQL QUERY" ORA-01013: 用户请求取消当前操作无法回滚
然后不久之后出现另一个错误:
MultipleInstanceError:正在创建多个不兼容的 TerminalInteractiveShell 子类实例。
sql_to_run 是不同 sql 查询的列表
有什么建议或指点吗??谢谢!
更新 9.7.18
认为这更像是我没有足够仔细地阅读文档的情况。事实上,在 loadsql 函数之外的 con 是导致问题的原因。以下是现在似乎按预期工作的代码更改。
def loadsql(sql):
con = cx_Oracle.connect(user="BLAH",password="BLAH",dsn = "BLAH")
result = pd.read_sql_query(sql,con)
con.close()
return result
values = [delayed(loadsql)(x) for x in sql_to_run]
#MultiProcessing version
import dask.multiprocessing
results = dask.compute(*values, scheduler='processes')
#My sample queries took 56.2 seconds
#MultiThreaded version
import dask.threaded
results = dask.compute(*values, scheduler='threads')
#My sample queries took 51.5 seconds