0

我正在开发一个多租户系统,任何数量的租户都可以注册系统并管理他们的数据。每当租户注册时,都会创建一个新的表分区,其分区名称以他的 id 结尾,他将管理 company_id 等于他的租户 id 的数据。还有一些不需要分区的所有租户都通用的表。/create/company使用以下代码在路由中完成分区创建

        ...
        db.session.add(company)
        db.session.commit()
        # tables to be partitioned
        for table_name in ['export_import_data_rel', 'fields_company_rel', 'export', 'import_data', 'import_logs']:
            db.engine.execute("CREATE TABLE {table_name}_{company_id} 
                               PARTITION OF {table_name}
                               FOR VALUES IN ('{company_id}');".format(table_name=table_name, company_id=company.id))

这可以很好地为每个租户创建新分区。但是当我执行flask db migrateandflask db upgrade时,租户注册时创建的那些分区被检测为已删除的表,import_data_1因为.import_data_2application/models.py

所以我试图在 中创建分区以及分区表application/models.py,以便它们也被检测到。我正在使用这个问题中提到的 mixin 类和元类来完成它。

在我的application/__init__.py, 以前的

from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy() 

现在变成

from sqlalchemy.ext.declarative import declarative_base
from flask_sqlalchemy.model import Model
from sqlalchemy.ext.declarative import DeclarativeMeta

class PartitionByMeta(DeclarativeMeta):
    def __new__(cls, clsname, bases, attrs):

        @classmethod
        def get_partition_name(cls_, key):
            return f'{cls_.__tablename__}_{key}'

        @classmethod
        def create_partition(cls_, key, _partition_type="LIST", _partition_range=(), *args, **kwargs):
            if key not in cls_.partitions:
                Partition = type(
                    f'{clsname}{key}',  # Class name, only used internally
                    bases,
                    {'__tablename__': cls_.get_partition_name(key)}
                )
                Partition.__table__.add_is_dependent_on(cls_.__table__)
                event.listen(
                    Partition.__table__,
                    'after_create',
                    DDL(
                        f"""
                        ALTER TABLE {cls_.__tablename__}
                        ATTACH PARTITION {Partition.__tablename__}
                        FOR VALUES IN ('{_partition_range}');
                        """
                    )
                    if _partition_type == "LIST" else
                    DDL(
                        f"""
                        ALTER TABLE {cls_.__tablename__}
                        ATTACH PARTITION {Partition.__tablename__}
                        FOR VALUES FROM ('{_partition_range[0]}') TO ('{_partition_range[1]}');
                        """
                    )
                )
                cls_.partitions[key] = Partition
            return cls_.partitions[key]

        attrs.update(
            {   # tables to be partitioned will be sending it in __table_args__, so I'm commenting __table_args__ and partitioned_by
                # '__table_args__': attrs.get('__table_args__', ()) + (dict(postgresql_partition_by=f'RANGE({partition_by})'),),
                # 'partitioned_by': partition_by,
                'partitions': {},
                'get_partition_name': get_partition_name,
                'create_partition': create_partition
            }
        )
        return super().__new__(cls, clsname, bases, attrs)

db = SQLAlchemy(model_class=declarative_base(cls=Model, metaclass=PartitionByMeta, name='Model'))

application/models.py这是我需要按company_id字段分区的表的示例

class ImportData(db.Model):
    id = db.Column(UUID(as_uuid=True), default=uuid.uuid4, nullable=False, index=True)
    company_id = db.Column(db.Integer, db.ForeignKey('res_company.id'), index=True)
    export_ids = db.relationship(
        'Export',
        secondary=export_import_data_rel, lazy='dynamic', backref=db.backref('import'),
        primaryjoin="and_(ImportData.id == foreign(export_import_data_rel.c.res_id), ImportData.company_id == foreign(export_import_data_rel.c.company_id))",
        secondaryjoin="and_(Export.id == foreign(export_import_data_rel.c.export_id), Export.company_id == export_import_data_rel.c.company_id)")
    business_name = db.Column(db.String(), index=True)
    phone = db.Column(db.String(), index=True)
    address = db.Column(db.String(), index=True)
    upserted_by = db.Column(db.Integer, db.ForeignKey('res_users.id'))
    upsert_date = db.Column(db.DateTime, default=datetime.utcnow)

    __table_args__ = (PrimaryKeyConstraint('id', 'company_id'), {"postgresql_partition_by": 'LIST (company_id)'})

将使用 mixins 和元类重写

class ImportDataMixin:
    id = db.Column(UUID(as_uuid=True), default=uuid.uuid4, nullable=False, index=True)
    business_name = db.Column(db.String(), index=True)
    phone = db.Column(db.String(), index=True)
    address = db.Column(db.String(), index=True)
    upsert_date = db.Column(db.DateTime, default=datetime.utcnow)

    @declared_attr
    def company_id(cls):
        return db.Column(db.Integer, db.ForeignKey('res_company.id'), index=True)

    @declared_attr
    def upserted_by(cls):
        return db.Column(db.Integer, db.ForeignKey('res_users.id'))

    @declared_attr
    def export_ids(cls):
        return db.relationship(
            'Export',
            secondary=export_import_data_rel, lazy='dynamic', backref=db.backref('import'),
            primaryjoin="and_(ImportData.id == foreign(export_import_data_rel.c.res_id), ImportData.company_id == foreign(export_import_data_rel.c.company_id))",
            secondaryjoin="and_(Export.id == foreign(export_import_data_rel.c.export_id), Export.company_id == export_import_data_rel.c.company_id)")

    __table_args__ = (PrimaryKeyConstraint('id', 'company_id'), {"postgresql_partition_by": 'LIST (company_id)'})

class ImportData(ImportDataMixin, db.Model, metaclass=PartitionByMeta):
    __tablename__ = 'import_data'

现在我正在尝试为import_data表添加 2 个分区

for i in range(0, 2):
    Partition = ImportData.create_partition(key=i + 1, _partition_type="LIST", _partition_range=(i + 1))

我给flask db migrateflask db upgrade一个新的数据库。但它不起作用;创建的架构是

CREATE TABLE public.import_data_1 (
    id uuid NOT NULL,
    business_name character varying,
    phone character varying,
    address character varying,
    upsert_date timestamp without time zone,
    company_id integer NOT NULL,
    upserted_by integer
)
PARTITION BY LIST (company_id);

import_data_2 也是如此。

此外,我还有其他未定义类的表,例如

export_import_data_rel = db.Table('export_import_data_rel',
       db.Column('company_id', db.Integer, primary_key=True),
       db.Column('export_id', UUID(as_uuid=True), primary_key=True),
       db.Column('res_id', UUID(as_uuid=True), primary_key=True),
       ForeignKeyConstraint(['export_id', 'company_id'], ['export.id', 'export.company_id'], ondelete="CASCADE"),
       ForeignKeyConstraint(['res_id', 'company_id'], ['import_data.id', 'import_data.company_id'], ondelete="CASCADE"),
       postgresql_partition_by='LIST (company_id)')

为了为这些表创建分区,我尝试过

event.listen(
    export_import_data_rel,
    "after_create",
    DDL("""ALTER TABLE export_import_data_rel ATTACH PARTITION export_import_data_rel_1 FOR VALUES IN ('1');""")
)

但它不会创建任何表。

所以我的问题是

  1. 如何正确对表进行分区,以便不使用 flask db migrate 和 upgrade 命令删除分区?这种方式甚至可能吗?每次我重新启动服务器时,分区代码都会运行吗?还有其他方法吗?
  2. 如何为没有类定义的表添加分区?

任何帮助表示赞赏。谢谢

4

0 回答 0