4

所以......作为对 API 调用的响应,我这样做:

i = CertainObject(paramA=1, paramB=2)
i.save()

现在我的作家数据库有一条新记录。

处理可能需要一点时间,我不想推迟对 API 调用者的响应,所以下一行我将使用 Celery 将对象 ID 传输到异步作业:

run_async_job.delay(i.id)

立即或几秒钟后,具体取决于队列run_async_job尝试使用提供的 ID 从数据库加载记录。这是一场赌博。有时它有效,有时不取决于只读副本是否更新。

是否有模式可以保证成功并且不必在阅读前“睡”几秒钟或希望好运?

谢谢。

4

4 回答 4

2

最简单的方法似乎是使用 Greg 和 Elrond 在他们的回答中提到的重试。如果您使用 shared_task 或 @app.task 装饰器,则可以使用以下代码片段。

@shared_task(bind=True)
def your_task(self, certain_object_id):
    try:
        certain_obj = CertainObject.objects.get(id=certain_object_id)
        # Do your stuff
    except CertainObject.DoesNotExist as e:
        self.retry(exc=e, countdown=2 ** self.request.retries, max_retries=20)

我在每次重试之间使用了指数倒计时。您可以根据需要对其进行修改。

您可以在此处找到自定义重试延迟的文档。还有另一个文档解释了这个链接中的指数退避

当您调用 retry 时,它会使用相同的任务 ID 发送一条新消息,并且它会注意确保将消息传递到与原始任务相同的队列中。您可以在此处的文档中阅读有关此内容的更多信息

于 2021-03-09T05:09:47.843 回答
2

由于写入然后立即加载它是一个高优先级,那么为什么不将它存储在基于内存的数据库中,如 Memcache 或 Redis。因此,一段时间后,您可以使用 celery 中的定期作业将其写入数据库,该作业将每隔一分钟左右运行一次。完成对 DB 的写入后,它将从 Redis/Memcache 中删除键。

您可以将数据保存在基于内存的数据库中一段时间​​,比如最需要数据的 1 小时。您还可以创建一个服务方法,该方法将检查数据是否在内存中。

Django Redis是一个很好的连接到 redis 的包(如果你在 Celery 中使用它作为代理)。

我提供了一些基于 Django 缓存的示例:

# service method

from django.core.cache import cache

def get_object(obj_id, model_cls):
    obj_dict = cache.get(obj_id, None)  # checks if obj id is in cache, O(1) complexity
    if obj_dict:
       return model_cls(**obj_dict)
    else:
       return model_cls.objects.get(id=obj_id)


# celery job

@app.task
def store_objects():
    logger.info("-"*25)
    # you can use .bulk_create() to reduce DB hits and faster DB entries
    for obj_id in cache.keys("foo_*"):
        CertainObject.objects.create(**cache.get(obj_id))
        cache.delete(obj_id)
    logger.info("-"*25)
       
于 2021-03-11T09:45:36.850 回答
1

DoesNotExist最简单的解决方案是捕获任务开始时抛出的任何错误,然后安排重试。这可以通过转换run_async_job为 a来完成Bound Task

@app.task(bind=True)
def run_async_job(self, object_id):
    try:
        instance = CertainObject.objects.get(id=object_id)
    except CertainObject.DoesNotExist:
        return self.retry(object_id)
于 2021-03-08T09:37:10.083 回答
0

本文深入探讨了如何处理复制数据库的写入后读取问题:https ://medium.com/box-tech-blog/how-we-learned-to-stop-worrying-and-read-来自-replicas-58cc43973638

像作者一样,我知道没有万无一失的万能方法来处理读写不一致。

我之前使用的主要策略是使用某种expect_and_get(pk, max_attempts=10, delay_seconds=5)方法来尝试获取记录,并尝试max_attempts多次,在两次尝试之间延迟delay_seconds几秒钟。这个想法是它“期望”记录存在,因此它将一定数量的故障视为只是暂时的数据库问题。它比仅仅睡一段时间更可靠一点,因为它会更快地获取记录,并希望减少作业执行的延迟。

另一种策略是延迟从特殊save_to_read方法返回,直到只读副本具有值,或者通过以某种方式将新值同步推送到只读副本,或者只是轮询它们直到它们返回记录。这种方式似乎有点hackier IMO。

对于很多读取操作,您可能不必担心写入后读取的一致性:

如果我们要呈现用户所属的企业的名称,如果管理员在极少数情况下更改它,那么将更改传播给企业用户需要一分钟的时间,这真的没什么大不了的。

于 2021-03-07T16:06:28.177 回答