0

我正在使用 Kedro 管理数据管道,在最后一步我有一个巨大的 csv 文件存储在 S3 存储桶中,我需要将其加载回 SQL Server。

我通常会使用批量插入来解决这个问题,但不太确定如何将其放入kedro模板中。这是目标表和 S3 存储桶,如在catalog.yml

flp_test:
  type: pandas.SQLTableDataSet
  credentials: dw_dev_credentials
  table_name: flp_tst
  load_args:
    schema: 'dwschema'
  save_args:
    schema: 'dwschema'
    if_exists: 'replace'

bulk_insert_input:
   type: pandas.CSVDataSet
   filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
   credentials: dev_s3


def insert_data(self, conn, csv_file_nm, db_table_nm):
    qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
    # Execute the query
    cursor = conn.cursor()
    success = cursor.execute(qry)
    conn.commit()
    cursor.close
  • 如何指向csv_file_nm我的bulk_insert_inputS3 目录?
  • 是否有适当的方法来间接访问dw_dev_credentials插入?
4

1 回答 1

1

Kedro 的pandas.SQLTableDataSet.html按原样使用pandas.to_sql方法。要按原样使用它,您需要一个pandas.CSVDataSetinto anode然后写入目标pandas.SQLDataTable数据集以便将其写入 SQL。如果你有 Spark 可用,这将比 Pandas 更快。

为了使用内置BULK INSERT查询,我认为您需要定义一个自定义数据集

于 2021-07-13T13:28:20.783 回答