除非我有点空白(这是可能的......杜松子酒......)这不能直接在关系上使用关联代理来完成,因为 a1.context 需要是一个集合,其中每个元素都有一个唯一的键,那么集合可以是一个字典 - 但这里没有这样的集合,因为 a1 可以有许多具有相同键的 Context 对象。Assoc prox 将对象集合简化为每个成员对象上的属性集合的简单方法不适用于此。
因此,如果您真的想要这个,并且您的结构无法更改,只需执行关联代理所做的事情,只需以硬编码的方式,即构建代理集合!实际上是两个,我想。没什么大不了的,只需要转动曲柄......相当多,确保在这里为每个操作添加测试:
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.associationproxy import association_proxy
import itertools
Base= declarative_base()
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
# other attributes
value = Column(Integer)
def __init__(self, value):
self.value = value
@property
def context(self):
return HolderBySetDict(self)
@context.setter
def context(self, dict_):
toremove = set([ctx for ctx in self.attached_by if ctx.key not in dict_])
toadd = set([Context(key=k, holder=item) for k, v in dict_.items()
for item in itertools.chain(v)])
self.attached_by.update(toadd)
self.attached_by.difference_update(toremove)
class HolderBySetDict(object):
def __init__(self, parent):
self.parent = parent
def __iter__(self):
return iter(self.keys())
def keys(self):
return list(set(ctx.key for ctx in self.parent.attached_by))
def __delitem__(self, key):
toremove = set([ctx for ctx in self.parent.attached_by if ctx.key == key])
self.parent.attached_by.difference_update(toremove)
def __getitem__(self, key):
return HolderBySet(self.parent, key)
def __setitem__(self, key, value):
current = set([ctx for ctx in self.parent.attached_by if ctx.key == key])
toremove = set([ctx for ctx in current if ctx.holder not in value])
toadd = set([Context(key=key,holder=v) for v in value if v not in current])
self.parent.attached_by.update(toadd)
self.parent.attached_by.difference_update(toremove)
# exercises ! for the reader !
#def __contains__(self, key):
#def values(self):
#def items(self):
# ....
class HolderBySet(object):
def __init__(self, parent, key):
self.key = key
self.parent = parent
def __iter__(self):
return iter([ctx.holder for ctx
in self.parent.attached_by if ctx.key == self.key])
def update(self, items):
curr = set([ctx.holder for ctx
in self.parent.attached_by if ctx.key==self.key])
toadd = set(items).difference(curr)
self.parent.attached_by.update(
[Context(key=self.key, holder=item) for item in toadd])
def remove(self, item):
for ctx in self.parent.attached_by:
if ctx.key == self.key and ctx.holder is item:
self.parent.attached_by.remove(ctx)
break
else:
raise ValueError("Value not present")
def add(self, item):
for ctx in self.parent.attached_by:
if ctx.key == self.key and ctx.holder is item:
break
else:
self.parent.attached_by.add(Context(key=self.key, holder=item))
# more exercises ! for the reader !
#def __contains__(self, key):
#def union(self):
#def intersection(self):
#def difference(self):
#def difference_update(self):
# ....
class Context(Base):
__tablename__ = "context"
holder_id = Column(Integer, ForeignKey("a.id"), nullable=False, primary_key=True)
attachment_id = Column(Integer, ForeignKey("a.id"), nullable=False, primary_key=True)
key = Column(String, primary_key=True)
holder = relationship(A,
primaryjoin=lambda: Context.holder_id==A.id)
attachment = relationship(A,
primaryjoin=lambda: Context.attachment_id==A.id,
backref=backref("attached_by", collection_class=set))
a1 = A(1)
a2 = A(2)
a3, a4, a5 = A(3), A(4), A(5)
a1.context["key_1"] = set([a2])
a1.context["key_2"] = set([a3, a4, a5])
assert set([ctx.holder for ctx in a1.attached_by if ctx.key == "key_1"]) == set([a2])
assert set([ctx.holder for ctx in a1.attached_by if ctx.key == "key_2"]) == set([a3, a4, a5])
a10 = A(10)
a1.context["key_1"].add(a10)
print set([ctx.holder for ctx in a1.attached_by if ctx.key == "key_1"])
assert set([ctx.holder for ctx in a1.attached_by if ctx.key == "key_1"]) == set([a2, a10])
a100 = A(100)
a101 = A(101)
a100.context = {
"key_1": set([a101])
}
assert set([ctx.holder for ctx in a100.attached_by]) == set([a101])