0

我为每个表使用类模型,并为每个使用 SQLAlchemy 的 sql 操作提供方法。

我安装 pgp_sym_encrypt 和 pgp_sym_decrypt 与 PostgreSQL 的扩展

CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA my_database;

在 SQLAlchemy 文档的帮助下,我使用了我的类 User,我在 pgp_sym_encrypt 的帮助下加密了数据,PostgreSQL 的包 pgcrypto 的功能(我使用版本 13.4)

为了安全起见,我对用户 i 的数据进行加密我想使用 postgreSQL 的原生 pgp_sym_encrypt 和 pgp_sym_decrypt 原生函数。在我使用纯文本查询与 psycopg2 进行此操作之前。由于我使用的是 SQLAlchemy ORM,因此通过它进行 Flask-SQLALchemy 扩展。

我找到了解决方案: https ://docs.sqlalchemy.org/en/14/core/custom_types.html#applying-sql-level-bind-result-processing

但是它们都没有显示在查询中使用它们的示例

实际上我的代码如下:

import uuid
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, String, DateTime, func, type_coerce, TypeDecorator
from sqlalchemy.dialects.postgresql import UUID, BYTEA
from instance.config import ENCRYPTION_KEY

db = SQLAlchemy()


class PGPString(TypeDecorator):
    impl = BYTEA

    cache_ok = True

    def __init__(self, passphrase):
        super(PGPString, self).__init__()
        self.passphrase = passphrase

    def bind_expression(self, bindvalue):
        # convert the bind's type from PGPString to
        # String, so that it's passed to psycopg2 as is without
        # a dbapi.Binary wrapper
        bindvalue = type_coerce(bindvalue, String)
        return func.pgp_sym_encrypt(bindvalue, self.passphrase)

    def column_expression(self, col):
        return func.pgp_sym_decrypt(col, self.passphrase)
    
    
class User(db.Model):
    __tablename__ = "user"
    __table_args__ = {'schema': 'private'}
    id = Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    sexe = Column("sexe", String)
    username = Column("username", PGPString(ENCRYPTION_KEY), unique=True)
    firstname = Column("firstname", PGPString(ENCRYPTION_KEY), unique=True)
    lastname = Column("lastname", PGPString(ENCRYPTION_KEY), unique=True)
    email = Column("email", PGPString(ENCRYPTION_KEY), unique=True)
    password = Column("password", PGPString(ENCRYPTION_KEY), unique=True)
    registration_date = Column("registration_date", DateTime)
    validation_date = Column("validation_date", DateTime)
    role = Column("role", String)

    @classmethod
    def create_user(cls, **kw):
        """
        Create an User
        """
        obj = cls(**kw)
        db.session.add(obj)
        db.session.commit()

一切正常,当我使用我的 create_user 方法时,用户数据已加密他的数据。

但是,如果在我的课程中我添加了一个方法来检查用户是否存在,通过此方法检查用户名、电子邮件或其他数据:

class User(db.Model)
    ....
    @classmethod
    def check_user_exist(cls, **kwargs):
        """
        Check if user exist
        :param kwargs:
        :return:
        """
        exists = db.session.query(cls).with_entities(cls.username).filter_by(**kwargs).first() is not None
        return exists

这将产生以下查询:

SELECT private."user".id AS private_user_id,
       private."user".sexe AS private_user_sexe,
       pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY)  AS private_user_username,
       pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
       pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY)  AS private_user_lastname,
       pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)     AS private_user_email,
       pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY)  AS private_user_password,
       private."user".registration_date AS private_user_registration_date,
       private."user".validation_date   AS private_user_validation_date,
       private."user".role              AS private_user_role

FROM private."user";

WHERE private."user".username = pgp_sym_encrypt(username, ENCRYPTION_KEY)
  AND private."user".email = pgp_sym_encrypt(email, ENCRYPTION_KEY)
  AND private."user".password = pgp_sym_encrypt(password, ENCRYPTION_KEY);

这个查询返回None是因为当我们使用带有实际 ENCRYPTION_KEY 的 pgp_sym_encrypt 时,数据将以一种新的方式加密。

https://www.postgresql.org/docs/13/pgcrypto.html#id-1.11.7.34.8 https://en.wikipedia.org/wiki/Pretty_Good_Privacy#/media/File:PGP_diagram.svg

那么如何修改 SQLAlchemy 文档函数PGPString(TypeDecorator):以使用 pgp_sym_decrypt 来解密列而不是加密变量,如下面的查询:

SELECT private."user".id AS private_user_id,
       private."user".sexe AS private_user_sexe,
       pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY)  AS private_user_username,
       pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
       pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY)  AS private_user_lastname,
       pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)     AS private_user_email,
       pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY)  AS private_user_password,
       private."user".registration_date AS private_user_registration_date,
       private."user".validation_date   AS private_user_validation_date,
       private."user".role              AS private_user_role
FROM private."user"
WHERE
      pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) = 'username'
    AND pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)  = 'email'
    AND   pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) = 'password';

通常我现在用 Python 和 SQLAlchemy 做得很好,但这真的超出了我的技能,我真的不知道如何修改这个函数,因为它会影响我完全不知道的功能。

目前我使用文本功能,但这很丑

例如,从用户名(加密数据)获取 id 的另一种方法

    @classmethod
    def get_id(cls, username):
        query = """
        SELECT private.user.id
        FROM private."user"
        WHERE pgp_sym_decrypt(private."user".username, :ENCRYPTION_KEY) = :username;
        """
        q = db.session.query(cls).from_statement(text(query)).params(ENCRYPTION_KEY=ENCRYPTION_KEY,
                                                                     username=username).first()
        return q.id

非常感谢你的帮助 !此致

4

1 回答 1

0

通过添加解决:

class PGPString(TypeDecorator):
    (...)
    def pgp_sym_decrypt(self, col):
        return func.pgp_sym_decrypt(col, self.passphrase, type_=String)

到 PGPString 并使用如下:

class User(db.session):
    (...)
    @classmethod
    def check_user_exist(cls, **kw):
        exists = db.session.query(cls).with_entities(cls.username)\
            .filter(func.pgp_sym_decrypt(User.username, ENCRYPTION_KEY, type_=String) == kw['username'])\
            .filter(func.pgp_sym_decrypt(User.email, ENCRYPTION_KEY, type_=String) == kw['email'])\
            .filter(func.pgp_sym_decrypt(User.password, ENCRYPTION_KEY, type_=String) == kw['password'])\
            .first() is not None
    return exist

感谢 sqlalchemy 团队: https ://github.com/sqlalchemy/sqlalchemy/discussions/7268

于 2021-11-05T15:24:10.803 回答