4

我正在尝试向 Superset(一个数据探索平台)添加一个特殊的数据源。本数据库仅支持HTTP API,返回json格式数据;例如:

> http://localhost/api/sql/query?q="select * from table"
< [{"id": 1, "value":10}, {"id": 2, "value": 30} ...]

因此,我必须在 python SQLAlchemy 中为 Superset 编写自己的适配器。我已经阅读了文档和部分源代码,但仍然需要很好的例子来学习。

4

1 回答 1

2

(解决方案由 OP 编辑​​成问题)

我已经解决了这个问题。这就是我所做的。

  1. ./site-packages/sqlalchemy/dialects

  2. 将任何具体方言复制到新方言(例如:命名 zeta)作为起点。更好的方法是使用

    from sqlalchemy.engine.default import DefaultDialect
    class ZetaDialect(DefaultDialect):
        ...
    
  3. 将 zeta 添加__all__./site-packages/sqlalchemy/dialects/__init__.py

  4. 创建一个测试程序:

    from sqlalchemy import create_engine
    engine = create_engine('zeta://XXX')
    result = engine.execute("select * from table_name")
    for row in result:
        print(row)
  1. 运行它并得到错误。使用 pdb 查找原因。大多数情况下,原因是没有实现一些接口。一一解决。

  2. 当测试程序给出正确答案时,它几乎已经完成了 90%。为了完整起见,我们还应该实现检查员使用的几个接口:

    class ZetaDialect(DefaultDialect):
        # default_paramstyle = 'qmark'
        name = 'zeta'

        def __init__(self, **kwargs):
            DefaultDialect.__init__(self, **kwargs)

        @classmethod
        def dbapi(cls):
            return zeta_dbapi

        @reflection.cache
        def get_table_names(self, connection, schema=None, **kw):
            return [u'table_1', u'table_2', ...]

        @reflection.cache
        def get_pk_constraint(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_foreign_keys(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_unique_constraints(self, connection, table_name,
                                   schema=None, **kw):
            return []

        @reflection.cache
        def get_indexes(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_schema_names(self, connection, **kw):
            return []

        @reflection.cache
        def get_columns(self, connection, table_name, schema=None, **kw):
            # just an example of the column structure
            result = connection.execute('select * from %s limit 1' % table_name)
            return [{'default': None, 'autoincrement': False, 'type': TEXT, 'name': colname, 'nullable': False} for colname, coltype in result.cursor.description]
于 2020-04-09T11:53:57.510 回答