2

我有一个 MongoDB 集合(用作作业队列),多个进程从中读取记录,使用findAndModify. FindAndModify搜索active字段为“false”的记录,将其设置为“true”,这样其他进程就不会读取相同的记录。

问题是查看日志,我看到不同的进程仍然读取相同的记录。这似乎发生在两个进程同时从队列中读取时。有什么方法可以确保一次只处理从集合中读取的内容?

我正在使用 Mongo 2.2.3 和 pymongo 2.2。

非常感谢!

编辑:有问题的日志是:

worker.3 2013-03-18 23:57:45,434 default-worker-3
project_name INFO 队列作业:作业 ID:5147a90f68e8fe0097002bdf

worker.3 2013-03-18 23:57:47,608 default-worker-3
project_name INFO 输入:14497 文档

worker.2 2013-03-18 23:57:45,440 default-worker-2
project_name INFO 队列作业:作业 ID:5147a90f68e8fe0097002bdf

worker.2 2013-03-18 23:57:47,658 default-worker-2
project_name INFO 输入:14497 文档

如您所见,worker.3 和worker.2 从队列中读取相同的作业(两个worker 具有相同的mongodb id)。

find_and_modify 命令:

query = {"active": False}
try:
    return self.collection.find_and_modify(
            query=query,
            update={"$set": {"active": True}},
            upsert=False,
            sort={"added_on": 1},
            limit=1
        )
except Exception, exc:
    LOGGER.exception(exc)
4

3 回答 3

4

让我说得非常清楚 - 两个不同的 findAndModify 命令不可能在您的场景中返回相同的文档。

不可能。以下是执行该工作的方法的前几行:

    Lock::DBWrite lk( ns );
    Client::Context cx( ns );

    BSONObj doc;

    bool found = Helpers::findOne( ns.c_str() , queryOriginal , doc );

请注意第 122 行,其中在查找之前获取了 WRITE 锁。

https://github.com/mongodb/mongo/blob/master/src/mongo/db/commands/find_and_modify.cpp#L122

两个进程不可能同时持有写锁。似乎更有可能发生了一些不同的事情(多个文档具有相同的 id 值,调用 find_and_modify 的函数返回相同的文档,将其返回给两个线程,其他我们不知道足以推测的事情)。

FindAndModify 是一个原子命令,它在执行期间持有一个独占写锁。我的建议是深入了解日志真正向您显示的内容,而不是根据关于必须发生的事情的不正确/无根据的假设来更改您的代码。

于 2013-03-21T04:11:05.187 回答
1

而是将您的“锁定”分为两个阶段。第一个更新记录查询第一个没有锁时间戳或时间戳过期的对象并设置新锁。然后使用您刚刚建立的锁定数据找到相同的对象。

于 2013-03-19T16:40:09.723 回答
1

首先对mongodb进行一个简单的查询以获取单个作业记录,

job = db.coll.find({query}).limit(1)

接下来更新指定作业 ID 和位置的记录active=false

update_response = db.coll.update(
    {_id:job.id, active=false},
    {$set:{active:true}},
    false,
    false
)

如果作业已经被另一个进程更新,由于 的查询约束,更新不会成功active=false。检查记录已更新的 update_response:

if update_response['n'] > 0 and update_response['updatedExisting']==true:
    return job

如果您的更新未成功,请获取另一个作业并重试。

于 2013-03-19T17:18:24.113 回答