1

现有数据库和预期结果:

我有一个更大的 SQLite 数据库(12gb,超过 4400 万行的表),我想在 Python3 中使用 Pandas 对其进行修改。

示例目标:我希望将这些大表中的一个(4400 万行)以块的形式读入 DF,操作 DF 块,并将结果写入新表。如果可能的话,如果新表存在,我想替换它,并将每个块附加到它。

因为我的操作只添加或修改列,所以新表应该与原始表具有相同的行数。

问题:

主要问题似乎源于以下代码中的以下行:

df.to_sql(new_table, con=db, if_exists = "append", index=False)

  1. 当在下面的代码中运行这一行时,我似乎一直得到一个额外的 size=N 块,加上一个比我预期的观察结果。
  2. 此代码第一次使用新表名运行时,出现错误:
 Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database
  1. 如果我然后重新运行脚本,使用相同的新表名,它将为每个块运行,以及一个额外的块,+1 行。

  2. 当该df.to_sql()行被注释掉时,循环运行预期的块数。

带有完整代码的问题测试示例:

完整代码:example.py

import pandas as pd
import sqlite3

#Helper Functions Used in Example
def ren(invar, outvar, df):
    df.rename(columns={invar:outvar}, inplace=True)
    return(df)

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()
new_table = "new_table"

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from test_table limit 10000;", con=db, chunksize = 5000)

for df in df_generator:
    #Functions to modify data, example
    df = ren("name", "renamed_name", df)
    print(df.shape)
    df.to_sql(new_table, con=db, if_exists = "append", index=False)


#Count if new table is created
try:
    count_result(c, new_table)
except:
    pass

1. 结果当 #df.to_sql(new_table, con=db, if_exists = "append", index=False)

(问题行已注释掉):

$ python3 example.py 
(5000, 22)
(5000, 22)

我期望这是因为示例代码将我的大表限制为 10k 行。

2. 结果当 df.to_sql(new_table, con=db, if_exists = "append", index=False)

一个。问题行没有被注释掉

湾。这是第一次使用 new_table 运行代码:

$ python3 example.py 
(5000, 22)
Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database

3. 结果当 df.to_sql(new_table, con=db, if_exists = "append", index=False)

一个。问题行没有被注释掉

湾。上面的代码使用 new_table第二次运行:

$ python3 example.py 
(5000, 22)
(5000, 22)
(5000, 22)
(1, 22)
[*] total: 20,001 rows in new_table table

因此,我首先遇到的问题是第一次运行时代码中断(结果 2),其次,第二次运行时的总行数(结果 3)是我预期的两倍多。

任何有关如何解决此问题的建议将不胜感激。

4

3 回答 3

1

您可以尝试指定:

db = sqlite3.connect("test.db", isolation_level=None)
#  ---->                        ^^^^^^^^^^^^^^^^^^^^

除此之外,您可能会尝试增加您的块大小,因为否则提交之间的时间会缩短 SQLite DB - 我猜这会导致此错误......我还建议使用 PostgreSQL、MySQL/MariaDB 或类似的东西 -它们更可靠,更适合这种数据库大小......

于 2018-02-28T23:21:36.360 回答
1

上述解决方案的时间延迟

@MaxU 添加isolation_level=None到数据库连接的解决方案简短而甜蜜。然而,无论出于何种原因,它都会显着减慢将每个块写入/提交到数据库的速度。例如,当我在 1200 万行的表上测试解决方案时,代码需要 6 个多小时才能完成。相反,从几个文本文件构建原始表格需要几分钟时间。

这种洞察导致了一个更快但不太优雅的解决方案,在 1200 万行的表上完成该解决方案需要不到 7 分钟,而不是超过 6 小时。输出行与输入行匹配,解决了我原来问题中的问题。

更快但不太优雅的解决方案

由于从文本文件/csv 文件构建原始表并使用 SQL 脚本加载数据,我将这种方法与 Panda 的块功能相结合。基本的基本步骤如下:

  1. 连接到数据库
  2. 使用 SQL 脚本创建一个新表(列和顺序应该与你对 pandas df 所做的任何事情相匹配)
  3. 分块读取海量表格
  4. 对于每个块,根据需要修改 df,写入 csv,使用 sql 加载 csv,然后提交更改。

解决方案主要代码:

import pandas as pd
import sqlite3

#Note I Used Functions I Wrote in build_db.py
#(shown below after example solution)
from build_db import *


#Helper Functions Used in Example
def lower_var(var, df):
    s = df[var].str.lower()
    df = df.drop(var, axis=1)
    df = pd.concat([df, s], axis=1)
    return(df)


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()

#create statement
create_table(c, "create_test.sql", path='sql_clean/')

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from example_table;", con=db, chunksize = 100000)

for df in df_generator:
    #functions to modify data, example
    df = lower_var("name", df) #changes column order

    #restore df to column order in sql table
    db_order = ["cmte_id", "amndt_ind", "rpt_tp", "transaction_pgi", "image_num", "transaction_tp", \
        "entity_tp", "name", "city", "state", "zip_code", "employer", "occupation", "transaction_dt", \
        "transaction_amt", "other_id", "tran_id", "file_num", "memo_cd", "memo_text", "sub_id"]
    df = df[db_order]

    #write chunk to csv
    file = "df_chunk.csv"
    df.to_csv(file, sep='|', header=None, index=False)

    #insert chunk csv to db
    insert_file_into_table(c, "insert_test.sql", file, '|', path='sql_clean/')
    db.commit()


#Count results
count_result(c, "test_indiv")

上述代码的导入用户函数

#Relavant Functions in build_db.py

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])

def create_table(cursor, sql_script, path='sql/'):
    print("[*] create table with {}{}".format(path, sql_script))
    qry = open("{}{}".format(path, sql_script), 'rU').read()
    cursor.executescript(qry)


def insert_file_into_table(cursor, sql_script, file, sep=',', path='sql/'):
    print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
    qry = open("{}{}".format(path, sql_script), 'rU').read()
    fileObj = open(file, 'rU', encoding='latin-1')
    csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')

    try:
        for row in csvReader:
            try:
                cursor.execute(qry, row)
            except sqlite3.IntegrityError as e:
                pass

    except Exception as e:
        print("[*] error while processing file: {}, error code: {}".format(file, e))
        print("[*] sed replacing null bytes in file: {}".format(file))
        sed_replace_null(file, "clean_null.sh")
        subprocess.call("bash clean_null.sh", shell=True)

        try:
            print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
            fileObj = open(file, 'rU', encoding='latin-1')
            csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')
            for row in csvReader:
                try:
                    cursor.execute(qry, row)
                except sqlite3.IntegrityError as e:
                    pass
                    print(e)    

        except Exception as e:
            print("[*] error while processing file: {}, error code: {}".format(file, e))

SQL 用户脚本

--create_test.sql

DROP TABLE if exists test_indiv;

CREATE TABLE test_indiv (
    cmte_id TEXT NOT NULL,
    amndt_ind TEXT,
    rpt_tp TEXT,
    transaction_pgi TEXT,
    image_num TEXT,
    transaction_tp TEXT,
    entity_tp TEXT,
    name TEXT,
    city TEXT,
    state TEXT,
    zip_code TEXT,
    employer TEXT,
    occupation TEXT,
    transaction_dt TEXT,
    transaction_amt TEXT,
    other_id TEXT,
    tran_id TEXT,
    file_num NUMERIC,
    memo_cd TEXT,
    memo_text TEXT,
    sub_id NUMERIC NOT NULL
);

CREATE UNIQUE INDEX idx_test_indiv ON test_indiv (sub_id);
--insert_test.sql

INSERT INTO test_indiv (
    cmte_id,
    amndt_ind,
    rpt_tp,
    transaction_pgi,
    image_num,
    transaction_tp,
    entity_tp,
    name,
    city,
    state,
    zip_code,
    employer,
    occupation,
    transaction_dt,
    transaction_amt,
    other_id,
    tran_id,
    file_num,
    memo_cd,
    memo_text,
    sub_id
    ) 
VALUES (
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?
);
于 2018-03-01T19:41:11.573 回答
0

遇到了完全相同的问题(处理 > 30 GB 的数据)。以下是我解决问题的方法:而不是使用 read_sql 的 Chunk 功能。我决定像这样创建一个手动块循环器:

chunksize=chunk_size
offset=0
for _ in range(0, a_big_number):
    query = "SELECT * FROM the_table %s offset %s" %(chunksize, offset)
    df = pd.read_sql(query, conn)
    if len(df)!=0:
        ....
    else:
        break
于 2018-08-10T22:40:39.773 回答