我有一个可以在各种不同的数据库引擎中实现的数据库模式(假设我将使用 pyodbc 连接到一个 MS Access 数据库,或者我将通过内置 sqlite3 模块连接到一个 SQLite 数据库作为简单的例子)。
我想创建一个工厂函数/方法,它根据某些参数返回适当类型的数据库连接,类似于以下内容:
def createConnection(connType, params):
if connType == 'sqlite':
return sqlite3.connect(params['filename'])
elif connType == 'msaccess':
return pyodbc.connect('DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ={};'.format(params['filename']))
else:
# do something else
现在我有一些查询代码应该适用于任何连接类型(因为无论底层数据库引擎如何架构都是相同的),但可能会引发我需要捕获的异常:
db = createDatabase(params['dbType'], params)
cursor = db.cursor()
try:
cursor.execute('SELECT A, B, C FROM TABLE')
for row in cursor:
print('{},{},{}'.format(row.A, row.B, row.C))
except DatabaseError as err:
# Do something...
我遇到的问题是每个 DB API 2.0 实现中的 DatabaseError 类不共享一个公共基类(除了过于通用的异常),所以我不知道如何一般地捕获这些异常。显然,我可以执行以下操作:
try:
# as before
except sqlite3.DatabaseError as err:
# do something
except pyodbc.DatabaseError as err:
# do something again
...我为每个可能的数据库引擎包含了一个显式的 catch 块。但这对我来说似乎显然不是pythonic。
如何从不同的底层 DB API 2.0 数据库实现中捕获 DatabaseErrors?