2

我需要将一些新数据附加到雪花上的现有表中。我sqlalchemy与 pandas 数据框一起用作引擎to_sql()。这是导入和脚本:

import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
import snowflake.sqlalchemy
from snowflake.connector.pandas_tools import pd_writer
from sqlalchemy.ext.declarative import declarative_base

registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')

columns_df = pd.DataFrame(data.columns.to_list(), columns={'survey_column_name'})
                        columns_df['survey_id'] = nextval
                        columns_df = columns_df[['survey_id', 'survey_column_name']]
                        columns_df.to_sql('SURVEY_METADATA_COLUMN_NAMES', 
                                         index = False,  
                                         index_label = None, 
                                         con = engine, 
                                         schema = 'PUBLIC', 
                                         if_exists = 'append', 
                                         chunksize = 300,
                                         method = pd_writer)

我得到的错误如下:

ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090105 (22000): 无法执行 CREATE

桌子。此会话没有当前数据库。调用“使用数据库”,或使用限定名称。[SQL:

创建表“公共”。“SURVEY_METADATA_COLUMN_NAMES”(

调查 ID 整数,

调查列名称文本)

]

连接如下:

user = input('Your Snowflake username: ')
password = getpass.getpass('Your Snowflake Password: ')
account = 'MY_ACCOUNT'
conn = snowCtx.connect(
    user=user,
    password=password,
    account=account,
    database='MY_DB',
    schema='PUBLIC',
    warehouse='COMPUTE_WH',
    role='SYSADMIN'
)

engine = create_engine(
    'snowflake://{user}:{password}@{account}/'.format(
        user=user,
        password=password,
        account=account,
        database='MY_DB',
        schema = 'PUBLIC',
        warehouse='COMPUTE_WH',
        role='SYSADMIN',
        cache_column_metadata=True
    )
)
4

1 回答 1

2

我改为使用write_pandas()

success, nchunks, nrows, _ = write_pandas(conn, 
                                          columns_df, 
                                          'SURVEY_METADATA_COLUMN_NAMES', 
                                          chunk_size = 300, 
                                          schema = 'PUBLIC')
                        print(success, nchunks, nrows)
if(success):
   print(filename+' columns uploaded')
else:
   print(filename+' columns were not uploaded')

哪个需要pyarrow库,所以我使用以下方法安装它:

pip install pyarrow

我删除了所有相关的导入sqlalchemy并保留了以下内容:

import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from snowflake.connector.pandas_tools import write_pandas
于 2020-11-24T17:29:09.577 回答