(解决方案由 OP 编辑成问题)
我已经解决了这个问题。这就是我所做的。
去./site-packages/sqlalchemy/dialects
将任何具体方言复制到新方言(例如:命名 zeta)作为起点。更好的方法是使用
from sqlalchemy.engine.default import DefaultDialect
class ZetaDialect(DefaultDialect):
...
将 zeta 添加__all__
到./site-packages/sqlalchemy/dialects/__init__.py
创建一个测试程序:
from sqlalchemy import create_engine
engine = create_engine('zeta://XXX')
result = engine.execute("select * from table_name")
for row in result:
print(row)
运行它并得到错误。使用 pdb 查找原因。大多数情况下,原因是没有实现一些接口。一一解决。
当测试程序给出正确答案时,它几乎已经完成了 90%。为了完整起见,我们还应该实现检查员使用的几个接口:
class ZetaDialect(DefaultDialect):
# default_paramstyle = 'qmark'
name = 'zeta'
def __init__(self, **kwargs):
DefaultDialect.__init__(self, **kwargs)
@classmethod
def dbapi(cls):
return zeta_dbapi
@reflection.cache
def get_table_names(self, connection, schema=None, **kw):
return [u'table_1', u'table_2', ...]
@reflection.cache
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
return []
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
return []
@reflection.cache
def get_unique_constraints(self, connection, table_name,
schema=None, **kw):
return []
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
return []
@reflection.cache
def get_schema_names(self, connection, **kw):
return []
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
# just an example of the column structure
result = connection.execute('select * from %s limit 1' % table_name)
return [{'default': None, 'autoincrement': False, 'type': TEXT, 'name': colname, 'nullable': False} for colname, coltype in result.cursor.description]