1

我们有一个 Django 应用程序,它在数据库中创建一个带有一些基本默认和初始信息的存根对象,然后将一条消息放到 Rabbit MQ 上以执行填充此存根对象数据所需的 IO 繁重工作。我们有一个单独的 rabbit MQ 消费者,它作为命令行脚本运行(由 daemontools 包装 - 所以它不会关闭),它处理来自 rabbit MQ 的消息。

所以这就是发生的事情 - 有时,当消息返回(此消息具有存根对象 ID)并且我们尝试使用 StubObject.objects.get(pk=message['ID']) 读取此对象时,我们会得到一个匹配的查询不存在异常(消息通常需要大约 30 秒才能返回)。但是,当我们检查数据库时,当我们得到查询不存在异常时,对象的数据肯定存在。这种情况大约在我们处理消息的 5 次中发生一次。我们不知道为什么会发生这种情况,并正在努力解决这个问题。特别奇怪的是它有时会失败。有任何想法吗?

我们尝试打印 connection.queries 但没有打印输出。

尝试保存的 rabbit MQ 消费者在标准 django python 文件之外运行,因此我们需要导入一堆东西以使其能够访问 ORM(不确定问题是否与我们如何执行此操作有关)。下面的代码:

#!/usr/bin/env python
import sys
import os
sys.path.append('/myproject')
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproject.settings'

from django.db import connection
from django.core import serializers
from django.core.management import setup_environ
from register_obj.models import Obj
from myproject import settings
import datetime
import json
import pika

setup_environ(settings)

conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = conn.channel()

print '[*] Waiting for responses. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print "[x] Received %r" % (body,)
    objDict = json.loads(body)
        print objDict
    try:
        obj = Obj.objects.get(pk=objDict["id"])
        print connection.queries
    except Exception, e:
        print datetime.datetime.now()
        print "Error occurred, could not find obj by ID - "
        print objDict["id"]
        print e
        print '################Query##############'
        print connection.queries

    obj.updateDict(objDict)

    # Save the obj data
    obj.save()

channel.basic_consume(callback,
        queue='myQueue',
        no_ack=True)

channel.start_consuming()
4

1 回答 1

1

通常,这是由于 Django 应用程序提交事务和队列使用者之间的竞争条件。

最有可能的是,您正在运行设置为自动将每个请求包装在事务中的 Django 应用程序。即,它开始一个事务,执行所有数据库操作,然后在处理请求并将其发送回浏览器后提交(或者如果有任何抛出异常则回滚)。在那里的某个地方创建你的存根对象,然后使用该存根的 id 将任务放入队列中。当队列为空时问题就来了,因此您的消费者立即获得任务。同时,在 Django 端完成其余请求时有一点延迟(即使只是将字节通过线路发送到浏览器并关闭连接也可能需要一段时间)并且事务仍然打开。在事务关闭之前,数据库中该对象的行将不可用于其他事务,例如您的消费者。

解决方案是为将任务放入队列并在将其放入队列之前提交的视图切换到手动事务处理。

它看起来像这样:

from django.django.db import transaction

@transaction.commit_manually
def some_view(request):
    try:
        # do some work...
        stub = Obj.objects.create(...)
    except:
        transaction.rollback()
        raise
    else:
        transaction.commit()
        add_task_to_queue(obj_id=stub.id)
        # finish serving request
        ...

当然,一旦您手动处理事务,您需要非常小心,始终提交或回滚您打开的任何事务。

于 2013-02-16T04:36:01.567 回答