我正在开发一个多租户系统,任何数量的租户都可以注册系统并管理他们的数据。每当租户注册时,都会创建一个新的表分区,其分区名称以他的 id 结尾,他将管理 company_id 等于他的租户 id 的数据。还有一些不需要分区的所有租户都通用的表。/create/company
使用以下代码在路由中完成分区创建
...
db.session.add(company)
db.session.commit()
# tables to be partitioned
for table_name in ['export_import_data_rel', 'fields_company_rel', 'export', 'import_data', 'import_logs']:
db.engine.execute("CREATE TABLE {table_name}_{company_id}
PARTITION OF {table_name}
FOR VALUES IN ('{company_id}');".format(table_name=table_name, company_id=company.id))
这可以很好地为每个租户创建新分区。但是当我执行flask db migrate
andflask db upgrade
时,租户注册时创建的那些分区被检测为已删除的表,import_data_1
因为.import_data_2
application/models.py
所以我试图在 中创建分区以及分区表application/models.py
,以便它们也被检测到。我正在使用这个问题中提到的 mixin 类和元类来完成它。
在我的application/__init__.py
, 以前的
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
现在变成
from sqlalchemy.ext.declarative import declarative_base
from flask_sqlalchemy.model import Model
from sqlalchemy.ext.declarative import DeclarativeMeta
class PartitionByMeta(DeclarativeMeta):
def __new__(cls, clsname, bases, attrs):
@classmethod
def get_partition_name(cls_, key):
return f'{cls_.__tablename__}_{key}'
@classmethod
def create_partition(cls_, key, _partition_type="LIST", _partition_range=(), *args, **kwargs):
if key not in cls_.partitions:
Partition = type(
f'{clsname}{key}', # Class name, only used internally
bases,
{'__tablename__': cls_.get_partition_name(key)}
)
Partition.__table__.add_is_dependent_on(cls_.__table__)
event.listen(
Partition.__table__,
'after_create',
DDL(
f"""
ALTER TABLE {cls_.__tablename__}
ATTACH PARTITION {Partition.__tablename__}
FOR VALUES IN ('{_partition_range}');
"""
)
if _partition_type == "LIST" else
DDL(
f"""
ALTER TABLE {cls_.__tablename__}
ATTACH PARTITION {Partition.__tablename__}
FOR VALUES FROM ('{_partition_range[0]}') TO ('{_partition_range[1]}');
"""
)
)
cls_.partitions[key] = Partition
return cls_.partitions[key]
attrs.update(
{ # tables to be partitioned will be sending it in __table_args__, so I'm commenting __table_args__ and partitioned_by
# '__table_args__': attrs.get('__table_args__', ()) + (dict(postgresql_partition_by=f'RANGE({partition_by})'),),
# 'partitioned_by': partition_by,
'partitions': {},
'get_partition_name': get_partition_name,
'create_partition': create_partition
}
)
return super().__new__(cls, clsname, bases, attrs)
db = SQLAlchemy(model_class=declarative_base(cls=Model, metaclass=PartitionByMeta, name='Model'))
application/models.py
这是我需要按company_id
字段分区的表的示例
class ImportData(db.Model):
id = db.Column(UUID(as_uuid=True), default=uuid.uuid4, nullable=False, index=True)
company_id = db.Column(db.Integer, db.ForeignKey('res_company.id'), index=True)
export_ids = db.relationship(
'Export',
secondary=export_import_data_rel, lazy='dynamic', backref=db.backref('import'),
primaryjoin="and_(ImportData.id == foreign(export_import_data_rel.c.res_id), ImportData.company_id == foreign(export_import_data_rel.c.company_id))",
secondaryjoin="and_(Export.id == foreign(export_import_data_rel.c.export_id), Export.company_id == export_import_data_rel.c.company_id)")
business_name = db.Column(db.String(), index=True)
phone = db.Column(db.String(), index=True)
address = db.Column(db.String(), index=True)
upserted_by = db.Column(db.Integer, db.ForeignKey('res_users.id'))
upsert_date = db.Column(db.DateTime, default=datetime.utcnow)
__table_args__ = (PrimaryKeyConstraint('id', 'company_id'), {"postgresql_partition_by": 'LIST (company_id)'})
将使用 mixins 和元类重写
class ImportDataMixin:
id = db.Column(UUID(as_uuid=True), default=uuid.uuid4, nullable=False, index=True)
business_name = db.Column(db.String(), index=True)
phone = db.Column(db.String(), index=True)
address = db.Column(db.String(), index=True)
upsert_date = db.Column(db.DateTime, default=datetime.utcnow)
@declared_attr
def company_id(cls):
return db.Column(db.Integer, db.ForeignKey('res_company.id'), index=True)
@declared_attr
def upserted_by(cls):
return db.Column(db.Integer, db.ForeignKey('res_users.id'))
@declared_attr
def export_ids(cls):
return db.relationship(
'Export',
secondary=export_import_data_rel, lazy='dynamic', backref=db.backref('import'),
primaryjoin="and_(ImportData.id == foreign(export_import_data_rel.c.res_id), ImportData.company_id == foreign(export_import_data_rel.c.company_id))",
secondaryjoin="and_(Export.id == foreign(export_import_data_rel.c.export_id), Export.company_id == export_import_data_rel.c.company_id)")
__table_args__ = (PrimaryKeyConstraint('id', 'company_id'), {"postgresql_partition_by": 'LIST (company_id)'})
class ImportData(ImportDataMixin, db.Model, metaclass=PartitionByMeta):
__tablename__ = 'import_data'
现在我正在尝试为import_data
表添加 2 个分区
for i in range(0, 2):
Partition = ImportData.create_partition(key=i + 1, _partition_type="LIST", _partition_range=(i + 1))
我给flask db migrate
了flask db upgrade
一个新的数据库。但它不起作用;创建的架构是
CREATE TABLE public.import_data_1 (
id uuid NOT NULL,
business_name character varying,
phone character varying,
address character varying,
upsert_date timestamp without time zone,
company_id integer NOT NULL,
upserted_by integer
)
PARTITION BY LIST (company_id);
import_data_2 也是如此。
此外,我还有其他未定义类的表,例如
export_import_data_rel = db.Table('export_import_data_rel',
db.Column('company_id', db.Integer, primary_key=True),
db.Column('export_id', UUID(as_uuid=True), primary_key=True),
db.Column('res_id', UUID(as_uuid=True), primary_key=True),
ForeignKeyConstraint(['export_id', 'company_id'], ['export.id', 'export.company_id'], ondelete="CASCADE"),
ForeignKeyConstraint(['res_id', 'company_id'], ['import_data.id', 'import_data.company_id'], ondelete="CASCADE"),
postgresql_partition_by='LIST (company_id)')
为了为这些表创建分区,我尝试过
event.listen(
export_import_data_rel,
"after_create",
DDL("""ALTER TABLE export_import_data_rel ATTACH PARTITION export_import_data_rel_1 FOR VALUES IN ('1');""")
)
但它不会创建任何表。
所以我的问题是
- 如何正确对表进行分区,以便不使用 flask db migrate 和 upgrade 命令删除分区?这种方式甚至可能吗?每次我重新启动服务器时,分区代码都会运行吗?还有其他方法吗?
- 如何为没有类定义的表添加分区?
任何帮助表示赞赏。谢谢