0

我有一个我创建的 SQLAlchemy sqlalchemy.sql.selectable.Join 对象,以便我可以将几个表的连接拉入一个 dask 数据框。

加入定义:

joined = TABLE1.join(TABLE2, TABLE1.c.COL1 == TABLE2.c.COL2)
joined = joined.outerjoin(TABLE3, TABLE1.c.COL1 == TABLE3.c.COL3)
joined = joined.outerjoin(TABLE4, TABLE1.c.COL1 == TABLE4.c.COL4)
joined = joined.join(TABLE5, TABLE1.c.COL5 == TABLE5.c.COL6)
joined = joined.outerjoin(TABLE6, TABLE5.c.COL7 == TABLE6.c.COL8)
joined = joined.outerjoin(TABLE7, TABLE6.c.COL9 == TABLE7.c.COL10)

如果我像这样直接将一个子集读入熊猫,它可以工作:

pd_df_join = pd.read_sql_query(
    join.select().limit(10000).compile(engine, compile_kwargs={'literal_binds': True}).string, engine, index_col='COL1')

但是,如果我尝试对 dask 做同样的事情,我会遇到两个错误之一,Pandas 或 Dask 都找不到我所指的列。熊猫:

In[15]: dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-15-d824e7a80ef7>", line 1, in <module>
    dd_df_join = dd.read_sql_table(join, engine_uri, index_col='SCHEMANAME_TABLE1_COL1', limits=(1,10000), npartitions=1)
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 137, in read_sql_table
    head = pd.read_sql(q, engine, **kwargs)
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 438, in read_sql
    chunksize=chunksize,
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 1237, in read_query
    parse_dates=parse_dates,
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/io/sql.py", line 129, in _wrap_result
    frame.set_index(index_col, inplace=True)
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/pandas/core/frame.py", line 4303, in set_index
    raise KeyError(f"None of {missing} are in the columns")
KeyError: "None of ['SCHEMANAME_TABLE1_COL1'] are in the columns"

黎明:

In[16]: dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
Traceback (most recent call last):
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-16-b639079b01cd>", line 1, in <module>
    dd_df_join2 = dd.read_sql_table(join, engine_uri, index_col='COL1', limits=(1,10000), npartitions=1)
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 110, in read_sql_table
    index = table.columns[index_col] if isinstance(index_col, str) else index_col
  File "/home/akettmann/venvs/tableau-extract/lib/python3.7/site-packages/sqlalchemy/util/_collections.py", line 194, in __getitem__
    return self._data[key]
KeyError: 'COL1'

我不确定是否有办法解决这个问题,或者我是否以错误的方式做某事。非常感谢任何帮助!

4

1 回答 1

0

您的输入应该只是 sql 表达式,不要编译或混合表达式和字符串。

在这种情况下,它可能看起来像

pd_df_join = dd.read_sql_table(join, engine_uri, index_col=TABLE1.c.COL1)
于 2020-03-16T19:01:11.030 回答