5

我一直在阅读关于不向数据库添加重复记录的 pandas to_sql 解决方案。我正在处理日志的 csv 文件,每次我上传一个新的日志文件时,我都会读取数据并使用 pandas 创建一个新的数据框进行一些更改。然后我执行to_sql('Logs',con = db.engine, if_exists = 'append', index=True)if_exists arg i 确保每次从新文件中创建的新数据框都附加到现有数据库中。问题是它不断添加重复值。我想确保如果一个已经上传的文件被错误地再次上传,它不会被附加到数据库中。我想在创建数据库时直接尝试这样做,而没有找到解决方法,比如检查文件名是否以前被使用过。

我正在使用烧瓶-sqlalchemy。

谢谢你。

4

1 回答 1

4

最好的办法是通过将索引设置为主键来捕获重复项,然后使用try/except来捕获唯一性违规。您提到了另一篇建议注意IntegrityError异常的帖子,我同意这是最好的方法。您可以将其与重复数据删除功能结合使用,以确保您的表更新运行顺利。

证明问题

这是一个玩具示例:

from sqlalchemy import *
import sqlite3

# make a database, 'test', and a table, 'foo'.
conn = sqlite3.connect("test.db")
c = conn.cursor()
# id is a primary key.  this will be the index column imported from to_sql().
c.execute('CREATE TABLE foo (id integer PRIMARY KEY, foo integer NOT NULL);')
# use the sqlalchemy engine.
engine = create_engine('sqlite:///test.db')

pd.read_sql("pragma table_info(foo)", con=engine)

   cid name     type  notnull dflt_value  pk
0    0   id  integer        0       None   1
1    1  foo  integer        1       None   0

现在,两个示例数据框dfdf2

data = {'foo':[1,2,3]}
df = pd.DataFrame(data)
df
   foo
0    1
1    2
2    3

data2 = {'foo':[3,4,5]}
df2 = pd.DataFrame(data2, index=[2,3,4])
df2
   foo
2    3       # this row is a duplicate of df.iloc[2,:]
3    4
4    5

移入df表格foo

df.to_sql('foo', con=engine, index=True, index_label='id', if_exists='append')

pd.read_sql('foo', con=engine)
   id  foo
0   0    1
1   1    2
2   2    3

现在,当我们尝试追加时df2,我们捕获了IntegrityError

try:
    df2.to_sql('foo', con=engine, index=True, index_label='id', if_exists='append')
# use the generic Exception, both IntegrityError and sqlite3.IntegrityError caused trouble.
except Exception as e: 
    print("FAILURE TO APPEND: {}".format(e))

输出:

FAILURE TO APPEND: (sqlite3.IntegrityError) UNIQUE constraint failed: foo.id [SQL: 'INSERT INTO foo (id, foo) VALUES (?, ?)'] [parameters: ((2, 3), (3, 4), (4, 5))]

建议的解决方案

在 上IntegrityError,您可以拉取现有表数据,删除新数据的重复条目,然后重试 append 语句。用于apply()此:

def append_db(data):
    try:
        data.to_sql('foo', con=engine, index=True, index_label='id', if_exists='append')
        return 'Success'
    except Exception as e:
        print("Initial failure to append: {}\n".format(e))
        print("Attempting to rectify...")
        existing = pd.read_sql('foo', con=engine)
        to_insert = data.reset_index().rename(columns={'index':'id'})
        mask = ~to_insert.id.isin(existing.id)
        try:
            to_insert.loc[mask].to_sql('foo', con=engine, index=False, if_exists='append')
            print("Successful deduplication.")
        except Exception as e2:
            "Could not rectify duplicate entries. \n{}".format(e2)
        return 'Success after dedupe'

df2.apply(append_db)

输出:

Initial failure to append: (sqlite3.IntegrityError) UNIQUE constraint failed: foo.id [SQL: 'INSERT INTO foo (id, foo) VALUES (?, ?)'] [parameters: ((2, 3), (3, 4), (4, 5))]

Attempting to rectify...
Successful deduplication.

foo    Success after dedupe
dtype: object
于 2017-09-02T21:23:09.837 回答