0

在我的多处理代码中,有几个工人用于并行处理。

工作人员应该只通过 request_queue 和带锁的共享值进行通信。

但现在看来,由于启动时的“分叉”,例如工人#4 和#5 共享同一个字典文档。我发现使用 id(document) 来查看内存地址。

由于将文档存储在 mongodb 中,其驱动程序正在将 _id 写回文档,因此出现了奇怪的错误。

必须确保每个工作人员都完全隔离预期队列和共享值,我现在不知道如何。

工人开始于:

for i in range(workers):
    Worker( request_queue,i,val, lock ).start()

class Worker(Process):
 def __init__(self, queue,ident,val,lock):
    super(Worker, self).__init__()

    self.queue= queue
    self.idstr= str(ident)
    self.val = val
    self.lock = lock
    dbconn = dbconnector.DBConnector()
    self.mongoconnection = dbconn.getMongoConnection()
    self.flagController = FlagController()
    print "Ident" + self.idstr

 def run(self):
    print 'Worker started'
    # do some initialization here

    print 'Worker Loop!'
    #time.sleep(5)
    try:
        for data in iter( self.queue.get, None ):
            mid = data["_id"]
            print "#" + self.idstr + " :  Mongoid " + str(mid)
            #time.sleep(5)
            try:

            timestamp = time.time()


            document = {"rawdata": data,
                                            "c": {
                                            "quelle": "t",
                                            "timestamp": mid.generation_time,
                                            "query" :  data["query"]                            
                                            }
                                            }

                    self.mongoconnection.insert("productive","input",document)

更新 我知道尝试通过构造函数传入一个新文档,并通过 self.document 在 Worker 内部使用它,但遗憾的是没有帮助。

4

1 回答 1

0

确保每个工作线程被隔离的一种方法是为每个工作线程提供其自己的文档实例变量。即用self.document 引用文档。

于 2013-08-15T16:30:19.587 回答