我有一个具有多对多关系的用户和角色模型
class User(BaseModel, TimestampableMixin):
username = Column(String(MEDIUM_STRING_LENGTH), nullable=False, unique=True)
roles = relationship('Role', secondary='user_roles', back_populates='users')
class Role(BaseModel, TimestampableMixin):
label = Column(String(MEDIUM_STRING_LENGTH), nullable=False, unique=True)
users = relationship('User', secondary='user_roles', back_populates='roles')
class UserRole(BaseModel, TimestampableMixin):
user_id = Column(ForeignKey('users.id', ondelete=CASCADE, onupdate=CASCADE), nullable=False, index=True)
role_id = Column(ForeignKey('roles.id', onupdate=CASCADE), nullable=False, index=True)
然后我为用户定义了模式来嵌套角色。
class RoleSchema(BaseSchema):
class Meta:
model = models.Role
class UserSchema(BaseSchema):
class Meta:
model = models.User
roles = fields.Nested('RoleSchema', many=True, exclude=['users'])
对于序列化,当角色对象列表包含在用户 GET 请求中时,这非常有用。还可以使用在请求中嵌入新角色对象的用户发布。我无法弄清楚的是如何发布/放置现有角色ID的列表,而不是创建新对象。例如,此请求有效:
{
"username": "testuser12",
"roles": [
{
"label": "newrole"
}
]
}
回复:
{
"createdTime": "2020-02-06T19:13:29Z",
"id": 4,
"modifiedTime": "2020-02-06T19:13:29Z",
"roles": [
{
"createdTime": "2020-02-06T19:13:29Z",
"id": 2,
"label": "newrole",
"modifiedTime": "2020-02-06T19:13:29Z"
}
],
"username": "testuser12"
}
但是这些请求都不起作用:
{
"username": "testuser13",
"roles": [
1
]
}
{
"username": "testuser13",
"roles": [
{
"id": 1
}
]
}
我收到以下回复:
{
"errors": {
"error": [
"Unprocessable Entity"
],
"message": [
"The request was well-formed but was unable to be followed due to semantic errors."
]
}
}
我可以说我在模式中遗漏了一些东西以便能够摄取 id 而不是对象,我怀疑我需要使用 dump_only/load_only 并可能为 PUT 使用单独的模式。但是,我无法在网上的任何地方找到这个用例的示例。
提及我flask-smorest
用于请求验证和模式参数摄取也可能会有所帮助。
@B_API.route('/user/<user_id>')
class UserByIdResource(MethodView):
@B_API.response(schemas.UserSchema)
def get(self, user_id):
"""
Get a single user by id
"""
return models.User.query.get(user_id)
@B_API.arguments(schemas.UserSchema)
@B_API.response(schemas.UserSchema)
def put(self, updated_user, user_id):
"""
Update fields of an existing user
"""
models.User.query.get_or_404(user_id, description=f'User with id {user_id} not found')
user = updated_user.update_with_db(user_id)
return user
update_with_db 看起来像:
def update_with_db(self, id: int):
self.id = id
DB.session.merge(self)
DB.session.commit()
return self.query.get(id)
感谢您的任何帮助。