2
cities = DBSession.query(City).filter(City.big=='Y').options(joinedload(City.hash)).limit(1)

t0 = time.time()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)

for kw_status in keyword_statuses:
    kw_status.status = 1
    DBSession.commit() 

t0 = time.time()
w = SWorker(threads_no=1, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')

w.work()

print 'finished'

上面的代码使用 select for update 从表中选择关键字状态。它锁定该行,直到该行被更新。

如您所见,我更新了行并提交了更改。

kw_status.status = 1
DBSession.commit()    

之后,我创建了一个 SWorker 对象,该对象将任务放入队列并创建了许多处理队列的线程(为了简单起见,这里只有一个)。

工人在完成处理后更新

kw_status.status = 2
DBSession.commit()

此时我得到一个例外

(1205, 'Lock wait timeout exceeded; try restarting transaction') 'UPDATE g_search_keyword_status SET status=%s WHERE g_search_keyword_status.keyword_id = %s' (2, 10000001L)

所以似乎该行被锁定。但是在我开始工作之前,我已经将状态更新为 1 并且我已经提交了更改,因此应该解锁该行。

我也使用 scoped_session

DBSession = scoped_session(
    sessionmaker(
    autoflush=True,
    autocommit=False,
    bind=engine
    )
)
4

1 回答 1

1

问题是延迟加载。注意

keyword_statuses =  DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)

for kw_status in keyword_statuses:
    kw_status.status = 1
DBSession.commit() 

上面的代码不会将结果加载到内存中,而是仅在实际需要时加载。

在我的 SWorker 中,我循环遍历 keywordStatus.keyword 对象,因此表锁定了 foreach thread 。

当我使用 all() 将结果加载到内存时问题解决了

keyword_statuses =  DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1).all()
于 2012-07-06T13:44:54.343 回答