我有一个我创建的 SQLAlchemy sqlalchemy.sql.selectable.Join 对象,以便我可以将几个表的连接拉入一个 dask 数据框。
加入定义:
joined = TABLE1.join(TABLE2, TABLE1.c.COL1 == TABLE2.c.COL2)
joined = joined.outerjoin(TABLE3, TABLE1.c.COL1 == TABLE3.c.COL3)
joined = joined.outerjoin(TABLE4, TABLE1.c.COL1 == TABLE4.c.COL4)
joined = joined.join(TABLE5, TABLE1.c.COL5 == TABLE5.c.COL6)
joined = joined.outerjoin(TABLE6, TABLE5.c.COL7 == TABLE6.c.COL8)
joined = joined.outerjoin(TABLE7, TABLE6.c.COL9 == TABLE7.c.COL10)
如果我像这样直接将一个子集读入熊猫,它可以工作:
pd_df_join = pd.read_sql_query(
join.select().limit(10000).compile(engine, compile_kwargs={'literal_binds': True}).string, engine, index_col='COL1')
但是,如果我尝试对 dask 做同样的事情,我会遇到两个错误之一,Pandas 或 Dask 都找不到我所指的列。熊猫:
In[15]: dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-15-d824e7a80ef7>", line 1, in <module>
dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 137, in read_sql_table
head = pd.read_sql(q, engine, **kwargs)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 438, in read_sql
chunksize=chunksize,
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 1237, in read_query
parse_dates=parse_dates,
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 129, in _wrap_result
frame.set_index(index_col, inplace=True)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/core/frame.py", line 4303, in set_index
raise KeyError(f"None of {missing} are in the columns")
KeyError: "None of ['SCHEMANAME_TABLE1_COL1'] are in the columns"
黎明:
In[16]: dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-16-b639079b01cd>", line 1, in <module>
dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 110, in read_sql_table
index = table.columns[index_col] if isinstance(index_col, str) else index_col
File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/sqlalchemy/util/_collections.py", line 194, in __getitem__
return self._data[key]
KeyError: 'COL1'
我不确定是否有办法解决这个问题,或者我是否以错误的方式做某事。非常感谢任何帮助!