3

I am given the task to update a database over the network with sqlalchemy. I have decided to use python's threading module. Currently I am using 1 thread, aka the producer thread, to direct other threads to consume work units via a queue.

The producer thread does something like this:

  def produce(self, last_id):
    unit = session.query(Request).order_by(Request.id) \
        .filter(Request.item_id == None).yield_per(50)
    self.queue.put(unit, True, Master.THREAD_TIMEOUT)     

while the consumer threads does something similar to this:

  def consume(self):
    unit = self.queue.get()
    request = unit
    item = Item.get_item_by_url(request)
    request.item = item
    session.add(request)
    session.flush()

and I am using sqlalchemy's scoped session:

session = scoped_session(sessionmaker(autocommit=True, autoflush=True, bind=engine))

However, I am getting the exception,

"sqlalchemy.exc.InvalidRequestError: Object FOO is already attached to session '1234' (this is '5678')"

I understand that this exception comes from the fact that the request object is created in one session (the producer session) while the consumers are using another scoped session because they belong to another thread.

My work around is to have my producer thread pass in the request.id into the queue while the consumer has to call the code below to retrieve the request object.

request = session.query(Request).filter(Request.id == request_id).first()

I do not like this solution because this involves another network call and is obviously not optimal.

  1. Are there ways to avoid wasting the result of the producer's db call?
  2. Is there a way to write the "produce" so that more than 1 id is passed into the queue as a work unit?

Feedback welcomed!

4

1 回答 1

4

您需要在将Request实例放入队列之前将其与主线程会话分离,然后在再次从队列中取出时将其附加到队列处理线程会话。

要分离,请调用.expunge()会话,传入请求:

session.expunge(unit)

然后在队列线程中处理它时,通过合并重新附加它;将该load标志设置为 False 以防止再次往返数据库:

session.merge(request, load=False)
于 2012-09-08T07:49:29.200 回答