如何更新行的信息?
例如,我想更改 id 为 5 的行的名称列。
使用Flask-SQLAlchemy 文档中显示的教程检索对象。拥有要更改的实体后,请更改实体本身。那么,db.session.commit()
。
例如:
admin = User.query.filter_by(username='admin').first()
admin.email = 'my_new_email@example.com'
db.session.commit()
user = User.query.get(5)
user.name = 'New Name'
db.session.commit()
Flask-SQLAlchemy 基于 SQLAlchemy,因此请务必查看SQLAlchemy 文档。
update
SQLAlchemy 中的 BaseQuery 对象上有一个方法,由filter_by
.
num_rows_updated = User.query.filter_by(username='admin').update(dict(email='my_new_email@example.com')))
db.session.commit()
update
当有许多对象要更新时,使用而不是更改实体的优势就来了。
如果您想add_user
授予所有admin
s 的权限,
rows_changed = User.query.filter_by(role='admin').update(dict(permission='add_user'))
db.session.commit()
请注意,filter_by
接受关键字参数(仅使用一个=
),而不是filter
接受表达式。
如果您修改模型的腌制属性,这将不起作用。应该替换腌制属性以触发更新:
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
from pprint import pprint
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqllite:////tmp/users.db'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), unique=True)
data = db.Column(db.PickleType())
def __init__(self, name, data):
self.name = name
self.data = data
def __repr__(self):
return '<User %r>' % self.username
db.create_all()
# Create a user.
bob = User('Bob', {})
db.session.add(bob)
db.session.commit()
# Retrieve the row by its name.
bob = User.query.filter_by(name='Bob').first()
pprint(bob.data) # {}
# Modifying data is ignored.
bob.data['foo'] = 123
db.session.commit()
bob = User.query.filter_by(name='Bob').first()
pprint(bob.data) # {}
# Replacing data is respected.
bob.data = {'bar': 321}
db.session.commit()
bob = User.query.filter_by(name='Bob').first()
pprint(bob.data) # {'bar': 321}
# Modifying data is ignored.
bob.data['moo'] = 789
db.session.commit()
bob = User.query.filter_by(name='Bob').first()
pprint(bob.data) # {'bar': 321}
除了 JSON 和 Pickled 属性之外,只需分配值并提交它们即可适用于所有数据类型。由于上面解释了腌制类型,我将记下一种稍微不同但简单的更新 JSON 的方法。
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), unique=True)
data = db.Column(db.JSON)
def __init__(self, name, data):
self.name = name
self.data = data
假设模型如上所示。
user = User("Jon Dove", {"country":"Sri Lanka"})
db.session.add(user)
db.session.flush()
db.session.commit()
这会将用户添加到 MySQL 数据库中,数据为 {"country":"Sri Lanka"}
修改数据将被忽略。我的代码不起作用如下。
user = User.query().filter(User.name=='Jon Dove')
data = user.data
data["province"] = "south"
user.data = data
db.session.merge(user)
db.session.flush()
db.session.commit()
与其经历将 JSON 复制到新 dict 的痛苦工作(而不是像上面那样将其分配给新变量),这应该可行,我找到了一种简单的方法来做到这一点。有一种方法可以标记 JSON 已更改的系统。
以下是工作代码。
from sqlalchemy.orm.attributes import flag_modified
user = User.query().filter(User.name=='Jon Dove')
data = user.data
data["province"] = "south"
user.data = data
flag_modified(user, "data")
db.session.merge(user)
db.session.flush()
db.session.commit()
这就像一个魅力。与此方法一起提出了另一种方法, 希望我对某些人有所帮助。
Models.py 定义序列化器
def default(o):
if isinstance(o, (date, datetime)):
return o.isoformat()
def get_model_columns(instance,exclude=[]):
columns=instance.__table__.columns.keys()
columns=list(set(columns)-set(exclude))
return columns
class User(db.Model):
__tablename__='user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
.......
####
def serializers(self):
cols = get_model_columns(self)
dict_val = {}
for c in cols:
dict_val[c] = getattr(self, c)
return json.loads(json.dumps(dict_val,default=default))
在 RestApi 中,我们可以通过将 json 数据传递给更新查询来动态更新记录:
class UpdateUserDetails(Resource):
@auth_token_required
def post(self):
json_data = request.get_json()
user_id = current_user.id
try:
instance = User.query.filter(User.id==user_id)
data=instance.update(dict(json_data))
db.session.commit()
updateddata=instance.first()
msg={"msg":"User details updated successfully","data":updateddata.serializers()}
code=200
except Exception as e:
print(e)
msg = {"msg": "Failed to update the userdetails! please contact your administartor."}
code=500
return msg
我正在寻找比@Ramesh 的回答(很好)但仍然充满活力的东西。这是一个将更新方法附加到 db.Model 对象的解决方案。
你传入一个字典,它只会更新你传入的列。
class SampleObject(db.Model):
id = db.Column(db.BigInteger, primary_key=True)
name = db.Column(db.String(128), nullable=False)
notes = db.Column(db.Text, nullable=False)
def update(self, update_dictionary: dict):
for col_name in self.__table__.columns.keys():
if col_name in update_dictionary:
setattr(self, col_name, update_dictionary[col_name])
db.session.add(self)
db.session.commit()
然后在一条路线中你可以做
object = SampleObject.query.where(SampleObject.id == id).first()
object.update(update_dictionary=request.get_json())