0

我正在使用的堆栈包括 tornado(async) 和 mongodb(motor) 我有一个用于处理请求数据的以下算法:

  1. 来自传入请求的数据保存到特定于事件的集合中
  2. 数据正在保存到统一的事件集合中

这是请求处理程序代码:

class EventHandler(BaseHandler):
    """ Handles all event requests
    """

    @gen.coroutine
    def post(self):
        """ Posts an event data
        """
        yield gen.Task(self.check_auth)
        self.validate_data()
        yield self._save_user()

        status_msg = yield self.save_entity()
        yield self.save_event()

        self.set_status(200, reason="OK, {}".format(status_msg))

这是从请求处理程序调用的方法的代码

@gen.coroutine
def save_entity(self):
    """ Saves event entity data for proper collection. Entities: orders, pageviews, users etc
    """
    event = self.data.get("event_type")
    if event not in self._event_schema_map.keys():
        raise Return("No specific entity, just event")
    try:
        if event == "cart_add":
            msg = yield gen.Task(self._save_product)
        elif event == "cart_delete":
            msg = yield gen.Task(self._delete_product)
        elif event == "pageview":
            msg = yield gen.Task(self._save_pageview)
        elif event == "order_complete":
            msg = yield gen.Task(self._save_order)
        elif event in ["email_known", "email_form"]:
            msg = yield gen.Task(self._save_email)
    except Exception as e:
        raise HTTPError(500, log_message=str(e))
    raise Return(msg)

@gen.coroutine
def save_event(self, event=None, event_type=None, event_data=None):
    """ Saves event data to db. Works both as standalone method and as plug-in method
    :param event: event name
    :param event_type: event type
    :param event_data: dict with event-specific infoelements data
    """
    yield self.motor.events.insert(
        {
            "client_id": self.data.get("client_id"),
            "user_id": self.data.get("user_id"),
            "timestamp": datetime.now(),
            "event": self.data.get("event", event),
            "event_type": self.data.get("event_type", event_type),
            "event_data": self.data.get("event_data", event_data),
            "event_url": self.data.get("event_url"),
            "utms": self.data.get("utms"),
            "analytics_short": self.data.get("analytics_short"),
            "analytics_long": self.data.get("analytics_long")
        }
    )

一切_save_%smth%都只是简单的电机 CRUD 操作,封装在函数调用中并包装在@engine装饰器中,如下所示:

@gen.engine
def _save_product(self, callback=None):
    """ Adds product to user's cart
    """
    cart_data = self.data.get("event_data")[0]
    try:
        yield self.motor.users.update(
            {"_id": self.data["user_id"], "client_id": self.data["client_id"]},
            {
                '$set': {
                    "cart_updated_at": datetime.now(),
                    "reminder": False,
                },
                '$push': {
                    "items": {
                        "product_id": cart_data.get("product_id"),
                        "image": cart_data.get("image"),
                        "title": cart_data.get("title"),
                        "price": int(cart_data.get("price"))
                    }
                }
            },
            upsert=True
        )
    except Exception as e:
        raise HTTPError(500, log_message=str(e))
    callback("New product in cart record added")

@gen.engine
def _save_order(self, callback=None):
    """ Saves order data to user's orders
    """
    order_data = self.data.get("event_data")
    try:
        yield self.motor.orders.update(
            {"user_id": self.data["user_id"], "client_id": self.data["client_id"]},
            {
                '$push': {
                    "orders": {
                        "completed_at": datetime.now(),
                        "analytics_short": self.data["analytics_short"],
                        "analytics_long": self.data["analytics_long"],
                        "utms": self.data["utms"],
                        "items": [
                            {
                                "product_id": i["product_id"],
                                "price": int(i["price"]),
                                "quantity": int(i["quantity"])
                            }
                            for i in order_data
                        ]
                    }
                }
            },
            upsert=True,
        )
    except Exception as e:
        raise HTTPError(500, log_message="Error in order updating: {}".format(e))
    try:
        yield self.motor.users.update(
            {"_id": self.data["user_id"], "client_id": self.data["client_id"]},
            {
                "$unset": {
                    "cart_created_at": '',
                    "cart_updated_at": '',
                    "reminder": '',
                    "items": ''
                }
            }
        )
    except Exception as e:
        raise HTTPError(500, log_message="Error in cart updating: {}".format(e))
    callback("Order record added")

因此请求数据在不同的集合中保存了两次: save_entity函数中的“特定的”和save_event函数中的“通用的” 。但实际上我看到,经常(大约 50% 的情况)被遗漏(数据未保存)并执行第二次保存。

所有数据处理和验证都是在之前进行的,因此假设抛出给 mongo 的数据是合适且有效的。

所以我想弄清楚,这种情况是如何发生的。我的猜测是 save_entity 函数设计不良,并且由于几个嵌入式函数,请求本身完成并且数据没有保存到数据库中。可以吗?

UPD添加了生产代码,所以现在情况会更清楚。我希望:) UPD 2添加了几个 CRUD 方法

4

1 回答 1

0

"update" 需要两个参数:一个指定要更新哪些文档的查询和一个更新文档。该查询遵循与 find() 或 find_one() 相同的语法。更新文档有两种模式:可以替换整个文档,也可以更新文档的某些字段。"update" 还接受一些可选参数,包括 "multi" 和 "upsert"。有关“更新”方法的更多信息,请参阅教程:

http://motor.readthedocs.org/en/stable/tutorial.html#updating-documents

在您的代码中,您使用第一个参数而不是第二个参数调用“更新”。我希望您的代码会抛出“TypeError: update() missing 1 required positional argument: 'document'”,并且异常正在被吞没,或者丢失在日志文件中,位于调用链的更高位置。

于 2014-11-29T17:27:32.770 回答