在我的多处理代码中,有几个工人用于并行处理。
工作人员应该只通过 request_queue 和带锁的共享值进行通信。
但现在看来,由于启动时的“分叉”,例如工人#4 和#5 共享同一个字典文档。我发现使用 id(document) 来查看内存地址。
由于将文档存储在 mongodb 中,其驱动程序正在将 _id 写回文档,因此出现了奇怪的错误。
必须确保每个工作人员都完全隔离预期队列和共享值,我现在不知道如何。
工人开始于:
for i in range(workers):
Worker( request_queue,i,val, lock ).start()
class Worker(Process):
def __init__(self, queue,ident,val,lock):
super(Worker, self).__init__()
self.queue= queue
self.idstr= str(ident)
self.val = val
self.lock = lock
dbconn = dbconnector.DBConnector()
self.mongoconnection = dbconn.getMongoConnection()
self.flagController = FlagController()
print "Ident" + self.idstr
def run(self):
print 'Worker started'
# do some initialization here
print 'Worker Loop!'
#time.sleep(5)
try:
for data in iter( self.queue.get, None ):
mid = data["_id"]
print "#" + self.idstr + " : Mongoid " + str(mid)
#time.sleep(5)
try:
timestamp = time.time()
document = {"rawdata": data,
"c": {
"quelle": "t",
"timestamp": mid.generation_time,
"query" : data["query"]
}
}
self.mongoconnection.insert("productive","input",document)
更新 我知道尝试通过构造函数传入一个新文档,并通过 self.document 在 Worker 内部使用它,但遗憾的是没有帮助。