0

我正在使用 kedro.extras.datasets.pandas.SQLTableDataSet 并想使用 pandas 的 chunk_size 参数。但是,在运行管道时,表被视为生成器而不是 pd.dataframe()。

您将如何在管道中使用 chunk_size?

我的目录:

table_name:
  type: pandas.SQLTableDataSet
  credentials: redshift
  table_name : rs_table_name
  layer: output
  save_args:
    if_exists: append
    schema: schema.name
    chunk_size: 1000
4

1 回答 1

0

查看最新的pandas文档,实际kwarg使用的是chunksize,而不是chunk_size。请参阅https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html。因为kedro只包装你save_args并将它们传递给pd.DataFrame.to_sql这些需要匹配:

def _save(self, data: pd.DataFrame) -> None:
    try:
        data.to_sql(**self._save_args)
    except ImportError as import_error:
        raise _get_missing_module_error(import_error) from import_error
    except NoSuchModuleError as exc:
        raise _get_sql_alchemy_missing_error() from exc

编辑:一旦你在你的管道中工作,文档显示pandas.DataFrame.read_sqlwith chunksizeset 将返回 type Iterator[DataFrame]。这意味着在您的节点函数中,您应该遍历输入(并在适当的情况下进行相应的注释),例如:

def my_node_func(input_dfs: Iterator[pd.DataFrame], *args):
  for df in input_dfs:
    ...

这适用于最新版本的pandas. 但是,我注意到这是pandas对齐 API 以便set 返回 a from所以我希望这种更改也会发生。read_csvchunksizeContextManagerpandas>=1.2read_sql

于 2021-05-13T17:06:44.903 回答