我正在使用的堆栈包括 tornado(async) 和 mongodb(motor) 我有一个用于处理请求数据的以下算法:
- 来自传入请求的数据保存到特定于事件的集合中
- 数据正在保存到统一的事件集合中
这是请求处理程序代码:
class EventHandler(BaseHandler):
""" Handles all event requests
"""
@gen.coroutine
def post(self):
""" Posts an event data
"""
yield gen.Task(self.check_auth)
self.validate_data()
yield self._save_user()
status_msg = yield self.save_entity()
yield self.save_event()
self.set_status(200, reason="OK, {}".format(status_msg))
这是从请求处理程序调用的方法的代码
@gen.coroutine
def save_entity(self):
""" Saves event entity data for proper collection. Entities: orders, pageviews, users etc
"""
event = self.data.get("event_type")
if event not in self._event_schema_map.keys():
raise Return("No specific entity, just event")
try:
if event == "cart_add":
msg = yield gen.Task(self._save_product)
elif event == "cart_delete":
msg = yield gen.Task(self._delete_product)
elif event == "pageview":
msg = yield gen.Task(self._save_pageview)
elif event == "order_complete":
msg = yield gen.Task(self._save_order)
elif event in ["email_known", "email_form"]:
msg = yield gen.Task(self._save_email)
except Exception as e:
raise HTTPError(500, log_message=str(e))
raise Return(msg)
@gen.coroutine
def save_event(self, event=None, event_type=None, event_data=None):
""" Saves event data to db. Works both as standalone method and as plug-in method
:param event: event name
:param event_type: event type
:param event_data: dict with event-specific infoelements data
"""
yield self.motor.events.insert(
{
"client_id": self.data.get("client_id"),
"user_id": self.data.get("user_id"),
"timestamp": datetime.now(),
"event": self.data.get("event", event),
"event_type": self.data.get("event_type", event_type),
"event_data": self.data.get("event_data", event_data),
"event_url": self.data.get("event_url"),
"utms": self.data.get("utms"),
"analytics_short": self.data.get("analytics_short"),
"analytics_long": self.data.get("analytics_long")
}
)
一切_save_%smth%
都只是简单的电机 CRUD 操作,封装在函数调用中并包装在@engine
装饰器中,如下所示:
@gen.engine
def _save_product(self, callback=None):
""" Adds product to user's cart
"""
cart_data = self.data.get("event_data")[0]
try:
yield self.motor.users.update(
{"_id": self.data["user_id"], "client_id": self.data["client_id"]},
{
'$set': {
"cart_updated_at": datetime.now(),
"reminder": False,
},
'$push': {
"items": {
"product_id": cart_data.get("product_id"),
"image": cart_data.get("image"),
"title": cart_data.get("title"),
"price": int(cart_data.get("price"))
}
}
},
upsert=True
)
except Exception as e:
raise HTTPError(500, log_message=str(e))
callback("New product in cart record added")
@gen.engine
def _save_order(self, callback=None):
""" Saves order data to user's orders
"""
order_data = self.data.get("event_data")
try:
yield self.motor.orders.update(
{"user_id": self.data["user_id"], "client_id": self.data["client_id"]},
{
'$push': {
"orders": {
"completed_at": datetime.now(),
"analytics_short": self.data["analytics_short"],
"analytics_long": self.data["analytics_long"],
"utms": self.data["utms"],
"items": [
{
"product_id": i["product_id"],
"price": int(i["price"]),
"quantity": int(i["quantity"])
}
for i in order_data
]
}
}
},
upsert=True,
)
except Exception as e:
raise HTTPError(500, log_message="Error in order updating: {}".format(e))
try:
yield self.motor.users.update(
{"_id": self.data["user_id"], "client_id": self.data["client_id"]},
{
"$unset": {
"cart_created_at": '',
"cart_updated_at": '',
"reminder": '',
"items": ''
}
}
)
except Exception as e:
raise HTTPError(500, log_message="Error in cart updating: {}".format(e))
callback("Order record added")
因此请求数据在不同的集合中保存了两次: save_entity函数中的“特定的”和save_event函数中的“通用的” 。但实际上我看到,经常(大约 50% 的情况)被遗漏(数据未保存)并执行第二次保存。
所有数据处理和验证都是在之前进行的,因此假设抛出给 mongo 的数据是合适且有效的。
所以我想弄清楚,这种情况是如何发生的。我的猜测是 save_entity 函数设计不良,并且由于几个嵌入式函数,请求本身完成并且数据没有保存到数据库中。可以吗?
UPD添加了生产代码,所以现在情况会更清楚。我希望:) UPD 2添加了几个 CRUD 方法