1

我正在尝试使用 python 将 UK Companies House csv 文件中的数据批量加载到 PostgreSQL 中。

我正在将每一行数据转换为一个字典列表,然后使用一个 unnest 语句将数据解压缩到一个大容量的 sql 语句中,这是我正在做的一个示例,(源中有更多字段)...

def buildDict(row)
    clean_name = row[0].decode('utf-8').upper()
    country_code = lookups.getCountryCodeFromName(row[14])
    if len(country_code) > 2:
        country_code = None
        insert_dict = {
            'companyname': row[0],
            'companynumber': row[1],
            'regaddress_careof': row[2],
            'regaddress_pobox': row[3],
            'dissolutiondate': row[13],
            }

            # convert 'None' and '' strings to None
            for k, v in six.iteritems(insert_dict):
                insert_dict[k] = set_to_null(v)


def fastInsert(data):
    sql='''
        INSERT INTO uk_data.companies_house(
          companyname,
          companynumber,
          regaddress_careof,
          regaddress_pobox,
          dissolutiondate
          )
          SELECT
          unnest( %(companyname)s ),
          unnest( %(companynumber)s ),
          unnest( %(regaddress_careof)s ),
          unnest( %(regaddress_pobox)s ),
          unnest( %(dissolutiondate)s )
          ;
    '''

    companyname=[str(r['companyname']) for r in data]
    companynumber=[str(r['companynumber']) for r in data]
    regaddress_careof=[str(r['regaddress_careof']) for r in data]
    regaddress_pobox=[str(r['regaddress_pobox']) for r in data]
    dissolutiondate=[datetime.strptime(r['dissolutiondate'], "%d/%m/%Y") if r['dissolutiondate'] else None for r in data]
    execute(sql,locals())


def execute(sql,params={}):
    with connect() as connection:
        with connection.cursor() as cursor:
            if params:
                cursor.execute(sql,params)
            else:
                cursor.execute(sql)

只要将所有内容都转换为字符串,此代码就可以正常工作,但是当我尝试将数据转换为日期时,每次日期记录没有值时都会出现以下错误(注意此值None由条件设置,所以应该加载到 PostgreSQL 中)。

Error could not determine polymorphic type because input has type "unknown"

我尝试将类型转换::DATE为 unnest 语句中,如下所示:

sql='''
    INSERT INTO uk_data.companies_house(
      companyname,
      companynumber,
      regaddress_careof,
      regaddress_pobox,
      dissolutiondate
      )
      SELECT
      unnest( %(companyname)s ),
      unnest( %(companynumber)s ),
      unnest( %(regaddress_careof)s ),
      unnest( %(regaddress_pobox)s ),
      unnest( %(dissolutiondate)s )::DATE
      ;
'''

但这无济于事。我的本地人的打印显示以下单个记录:

('these are my locals: ', {'regaddress_posttown': ['LEEDS'], 'regaddress_addressline1': ['METROHOUSE 57 PEPPER ROAD'], 'regaddress_addressline2': ['HUNSLET'], 'regaddress_careof': ['None'], 'companystatus': ['Active'], 'companycategory': ['Private Limited Company'], 'companyname': ['! LTD'], 'countryoforigin': ['None'], 'regaddress_pobox': ['None'], 'regaddress_country': ['None'], 'dissolutiondate': None, 'regaddress_postcode': ['LS10 2RU'], 'regaddress_county': ['YORKSHIRE'], 'sql': '
        INSERT INTO uk_data.companies_house(
          companyname,
          companynumber,
          regaddress_careof,
          regaddress_pobox,
          regaddress_addressline1,
          regaddress_addressline2,
          regaddress_posttown,
          regaddress_county,
          regaddress_country,
          regaddress_postcode,
          companycategory,
          companystatus,
          countryoforigin,
          dissolutiondate
          )
          SELECT
          unnest( %(companyname)s ),
          unnest( %(companynumber)s ),
          unnest( %(regaddress_careof)s ),
          unnest( %(regaddress_pobox)s ),
          unnest( %(regaddress_addressline1)s ),
          unnest( %(regaddress_addressline2)s ),
          unnest( %(regaddress_posttown)s ),
          unnest( %(regaddress_county)s ),
          unnest( %(regaddress_country)s ),
          unnest( %(regaddress_postcode)s ),
          unnest( %(companycategory)s ),
          unnest( %(companystatus)s ),
          unnest( %(countryoforigin)s ),
          unnest( %(dissolutiondate)s )
          ;
    ', 'r': {'regaddress_posttown': 'LEEDS', 'regaddress_careof': None, 'companystatus': 'Active', 'companynumber': '08209948', 'regaddress_addressline1': 'METROHOUSE 57 PEPPER ROAD', 'regaddress_addressline2': 'HUNSLET', 'companycategory': 'Private Limited Company', 'companyname': '! LTD', 'countryoforigin': None, 'regaddress_pobox': None, 'regaddress_country': None, 'dissolutiondate': None, 'regaddress_postcode': 'LS10 2RU', 'regaddress_county': 'YORKSHIRE'}, 'data': [{'regaddress_posttown': 'LEEDS', 'regaddress_careof': None, 'companystatus': 'Active', 'companynumber': '08209948', 'regaddress_addressline1': 'METROHOUSE 57 PEPPER ROAD', 'regaddress_addressline2': 'HUNSLET', 'companycategory': 'Private Limited Company', 'companyname': '! LTD', 'countryoforigin': None, 'regaddress_pobox': None, 'regaddress_country': None, 'dissolutiondate': None, 'regaddress_postcode': 'LS10 2RU', 'regaddress_county': 'YORKSHIRE'}], 'companynumber': ['08209948']})

我不确定这是否相关,但我注意到局部变量一旦从 dict 中取出,就会全部放在一个列表中:['None']但是导致问题的日期变量 ( dissolutiondate) 以真正的None价值。

4

1 回答 1

2

好的。所以问题原来是 psycopg2 和 postgresql 在处理数组时交互的方式,pscyopg 中曾经有一个错误,不允许将空值数组导入 postgres,如下所述:

https://github.com/psycopg/psycopg2/issues/285

正如 Vao Tsun 所指出的,解决方案在于每个非嵌套语句的强制转换,这必须是明确的,但还必须[]在每个数据类型说明符之后包含括号。

我在这里也错误地将我的变量转换为python中的字符串:

companyname=[str(r['companyname']) for r in data]

这导致None值被转换为'None'值字符串。

这是正确代码的示例:

SELECT
          unnest( %(companyname)s::TEXT[] ),
          unnest( %(companynumber)s::TEXT[] ),
          unnest( %(regaddress_careof)s::TEXT[] ),
          unnest( %(regaddress_pobox)s::TEXT[] ),
          unnest( %(regaddress_addressline1)s::TEXT[] ),
          unnest( %(regaddress_addressline2)s::TEXT[] ),
          unnest( %(regaddress_posttown)s::TEXT[] ),
          unnest( %(regaddress_county)s::TEXT[] ),
          unnest( %(regaddress_country)s::TEXT[] ),
          unnest( %(regaddress_postcode)s::TEXT[] ),
          unnest( %(companycategory)s::TEXT[] ),
          unnest( %(companystatus)s::TEXT[] ),
          unnest( %(countryoforigin)s::TEXT[] ),
          unnest( %(dissolutiondate)s::TIMESTAMP[] ),

companyname=[(r['companyname']) for r in data]
    companynumber=[(r['companynumber']) for r in data]
    regaddress_careof=[(r['regaddress_careof']) for r in data]
    regaddress_pobox=[(r['regaddress_pobox']) for r in data]
    regaddress_addressline1=[(r['regaddress_addressline1']) for r in data]
    regaddress_addressline2=[(r['regaddress_addressline2']) for r in data]
    regaddress_posttown=[(r['regaddress_posttown']) for r in data]
    regaddress_county=[(r['regaddress_county']) for r in data]
    regaddress_country=[(r['regaddress_country']) for r in data]
    regaddress_postcode=[(r['regaddress_postcode']) for r in data]
    companycategory=[(r['companycategory']) for r in data]
    companystatus=[(r['companystatus']) for r in data]
    countryoforigin=[(r['countryoforigin']) for r in data]
    dissolutiondate=[datetime.strptime(r['dissolutiondate'], "%d/%m/%Y") if r['dissolutiondate'] else None for r in data]
于 2017-01-26T08:41:52.587 回答