2

我正在尝试构建一个 Pyramid 应用程序。我从 SQLAlchemy 脚手架开始。我遇到了一个问题,我想知道解决它的最佳方法是什么。在我的一个观点中,我需要从两个不相关的表中选择很多行。我需要确保在从第一个表中选择行和从第二个表中选择行之间没有任何行插入到第二个表中。

我有三个模型,NodeTestTasking。两者Nodes都有Tests相当多的元数据。给定一个 的列表Nodes和一个 的列表,可以创建Tests一个 的全局列表。Taskings例如,我们可以有三个Nodes, a,bc两个Tests“我们需要一个节点来执行任务P”和“我们需要两个节点来执行任务Q”。

根据这些信息,Tasks应该创建三个。例如:

  1. “节点a应该做任务P
  2. “节点b应该做任务Q
  3. “节点c应该做任务Q

现在,我正在尝试为此提供一个 REST API。绝大多数时间客户将请求 的列表Tasks,因此需要快速。但是,有时客户可能会添加 aNodeTest. 发生这种情况时,我需要重新生成整个列表Tasks

这是一个粗略的例子:

@view_config(route_name='list_taskings')
def list_taskings(request):
    return DBSession.Query(Tasking).all()

@view_config(route_name='add_node')
def add_node(request):
    DBSession.add(Node())
    _update_taskings()

@view_config(route_name='add_test')
def add_test(request):
    DBSession.add(Test())
    _update_taskings()

def _update_taskings():
    nodes = DBSession.query(Node).all()
    tests = DBSession.query(Test).all()

    # Process...

    Tasking.query.delete()
    for t in taskings:
        DBSession.add(t)

我正在使用默认的 Pyramid SQLAlchemy 脚手架。因此,每个请求都会自动启动一个事务。因此,如果_update_tasking从一个请求(例如add_node)调用,那么新节点将被添加到本地DBSession,并且查询 allNodesTestsin_update_tasking将返回该新元素。此外,删除所有现有的Taskings并添加新计算的也是安全的。

我有两个问题:

  1. 如果在Tests我得到我的 list ofnodes和我的 list of testsin之间将新行添加到表中会发生_update_taskings什么?在我的现实世界生产系统中,这些选择很接近,但并不相邻。存在竞争条件的可能性。

  2. 如何确保将更新的两个请求Taskings不会相互覆盖?例如,想象一下如果我们现有的系统有一个Node和一个Test. 两个请求同时进来,一个添加 a Node,一个添加 a Test。即使问题 #1 不是问题,并且我知道每个请求的一对选择代表“数据库中的单个时间实例”,仍然存在一个请求覆盖另一个请求的问题。如果第一个请求首先以 now two Nodesand one完成Test,则第二个请求仍将选择旧数据(可能)并将生成一个Taskings包含 oneNode和 two的列表Tests

那么,处理这个问题的最佳方法是什么?我在开发中使用 SQLite,在生产中使用 PostgreSQL,但我想要一个与数据库无关的解决方案。我不担心其他应用程序访问这个数据库。我的 REST API 将是唯一的访问机制。我应该锁定任何改变数据库的请求(添加 aNode或 a Test)吗?我应该以某种方式锁定数据库吗?

谢谢你的帮助!

4

1 回答 1

5

使用可序列化事务隔离级别应该可以防止这两个问题。如果一个事务修改了可能影响另一个事务中先前读取结果的数据,则存在序列化冲突。只有一个事务获胜,所有其他事务都被数据库中止,由客户端重新启动。SQLite 通过锁定整个数据库来做到这一点,而 PostgreSQL 采用了更复杂的机制(有关详细信息,请参阅文档)。不幸的是,没有可移植的 sqlalchemic 方法来捕获序列化异常并重试。您需要编写特定于数据库的代码以可靠地将其与其他错误区分开来。

我已经建立了一个示例程序,其中两个线程同时修改数据(您的方案的非常基本的复制),遇到冲突并重试:

https://gist.github.com/khayrov/6291557

使用 Pyramid 事务中间件和 Zope 事务管理器,这将更加容易。在捕获序列化错误后,raiseTransientError和中间件将重试整个请求,而不是手动重试tm.attempts(在 paste 配置中)次。

from transaction.interfaces import TransientError

class SerializationConflictError(TransientError):
    def __init__(self, orig):
        self.orig = orig

您甚至可以编写自己的pyramid_tm位于堆栈下方的中间件,该中间件将捕获序列化错误并将它们透明地转换为瞬态错误。

def retry_serializable_tween_factory(handler, registry):

    def retry_tween(request):
        try:
            return handler(request)
        except DBAPIError, e:
            orig = e.orig
            if getattr(orig, 'pgcode', None) == '40001':
                raise SerializationConflictError(e)
            elif isinstance(orig, sqlite3.DatabaseError) and \
                orig.args == ('database is locked',):
                raise SerializationConflictError(e)
            else:
                raise

    return retry_tween
于 2013-08-21T08:09:19.470 回答