18

请原谅术语中的任何错误。特别是,我正在使用关系数据库术语。

有许多持久键值存储,包括CouchDBCassandra,以及许多其他项目。

反对它们的一个典型论点是它们通常不允许跨多行或多表的原子事务。我想知道是否有一种通用方法可以解决这个问题。

以一组银行账户的情况为例。我们如何将钱从一个银行账户转移到另一个银行账户?如果每个银行账户都是一行,我们希望更新两行作为同一交易的一部分,减少一个的值并增加另一个的值。

一种明显的方法是有一个单独的表来描述事务。然后,将资金从一个银行账户转移到另一个银行账户只需在该表中插入一个新行。我们不存储两个银行账户中的任何一个的当前余额,而是依赖于汇总交易表中所有适当的行。然而,很容易想象这将是太多的工作。一家银行每天可能有数百万笔交易,而一个单独的银行账户可能很快就会有数千笔与之相关的“交易”。

如果基础数据自您上次抓取后发生更改,则许多(全部?)键值存储将“回滚”操作。可能这可用于模拟原子事务,然后,您可以指示特定字段已锁定。这种方法存在一些明显的问题。

还有其他想法吗?完全有可能我的方法完全不正确,而且我还没有把我的大脑包裹在新的思维方式上。

4

5 回答 5

13

以您为例,如果您想以原子方式更新单个文档(关系术语中的行)中的值,则可以在 CouchDB 中执行此操作。如果其他竞争客户端在您阅读后更新了同一个文档,则在您尝试提交更改时会出现冲突错误。然后,您必须读取新值、更新并重试提交。您可能必须重复此过程的次数不确定(如果有很多争用,可能是无限次),但如果您的提交成功,您可以保证在数据库中有一个具有原子更新余额的文档。

如果您需要更新两个余额(即从一个账户转移到另一个账户),那么您需要使用单独的交易文档(实际上是另一个表,其中行是交易)存储金额和两个账户(进出) . 顺便说一句,这是一种常见的簿记做法。由于 CouchDB 仅根据需要计算视图,因此从列出该帐户的交易中计算帐户中的当前金额实际上仍然非常有效。在 CouchDB 中,您将使用映射函数,该函数将帐号作为键和交易金额(正数表示传入,负数表示传出)。您的 reduce 函数将简单地将每个键的值相加,发出相同的键和总和。然后,您可以使用 group=True 的视图来获取由帐号键入的帐户余额。

于 2009-07-07T20:56:26.677 回答
5

CouchDB 不适合事务系统,因为它不支持锁定和原子操作。

为了完成银行转帐,您必须做一些事情:

  1. 验证交易,确保源账户中有足够的资金,两个账户都是开放的、没有锁定的,并且信誉良好,等等
  2. 减少源账户的余额
  3. 增加目的账户余额

如果在这些步骤中的任何一个步骤之间对帐户的余额或状态进行了更改,则交易在提交后可能会变得无效,这在此类系统中是一个大问题。

即使您使用上面建议的方法插入“转移”记录并使用 map/reduce 视图来计算最终帐户余额,您也无法确保不会透支源帐户,因为仍然存在检查源帐户余额和插入交易之间的竞争条件,在检查余额后可以同时添加两个交易。

所以......这是工作的错误工具。CouchDB 可能擅长很多事情,但这确实是它做不到的。

编辑:可能值得注意的是,现实世界中的实际银行使用最终一致性。如果您透支银行帐户的时间足够长,您将获得透支费。如果你做得很好,你甚至可以几乎同时从两台不同的 ATM 取款并透支你的账户,因为检查余额、发放资金和记录交易存在竞争条件。当您将支票存入您的帐户时,他们会增加余额,但实际上会在一段时间内“以防万一”源帐户确实没有足够的钱。

于 2011-10-25T03:38:47.117 回答
5

提供一个具体的例子(因为网上缺乏正确的例子令人惊讶):这里是如何在 CouchDB 中实现“原子银行余额转移”(主要复制自我关于同一主题的博客文章:http://blog.codekills .net/2014/03/13/atomic-bank-balance-transfer-with-couchdb/

首先,简要回顾一下这个问题:如何设计一个允许在账户之间转移资金的银行系统,以便不存在可能留下无效或无意义余额的竞争条件?

这个问题有几个部分:

第一:事务日志。不是将账户余额存储在单个记录或文档中——{"account": "Dave", "balance": 100}而是通过汇总该账户的所有贷方和借方来计算账户余额。这些贷方和借方存储在事务日志中,可能如下所示:

{"from": "Dave", "to": "Alex", "amount": 50}
{"from": "Alex", "to": "Jane", "amount": 25}

用于计算余额的 CouchDB map-reduce 函数可能如下所示:

POST /transactions/balances
{
    "map": function(txn) {
        emit(txn.from, txn.amount * -1);
        emit(txn.to, txn.amount);
    },
    "reduce": function(keys, values) {
        return sum(values);
    }
}

为了完整起见,以下是余额列表:

GET /transactions/balances
{
    "rows": [
        {
            "key" : "Alex",
            "value" : 25
        },
        {
            "key" : "Dave",
            "value" : -50
        },
        {
            "key" : "Jane",
            "value" : 25
        }
    ],
    ...
}

但这留下了一个明显的问题:错误是如何处理的?如果有人试图进行大于余额的转账会发生什么?

对于 CouchDB(和类似的数据库),这种业务逻辑和错误处理必须在应用程序级别实现。天真地,这样的函数可能看起来像这样:

def transfer(from_acct, to_acct, amount):
    txn_id = db.post("transactions", {"from": from_acct, "to": to_acct, "amount": amount})
    if db.get("transactions/balances") < 0:
        db.delete("transactions/" + txn_id)
        raise InsufficientFunds()

但是请注意,如果应用程序在插入交易和检查更新的余额之间崩溃,数据库将处于不一致的状态:发送方可能会留下负余额,而接收方可能会收到以前不存在的钱:

// Initial balances: Alex: 25, Jane: 25
db.post("transactions", {"from": "Alex", "To": "Jane", "amount": 50}
// Current balances: Alex: -25, Jane: 75

如何解决这个问题?

为了确保系统永远不会处于不一致状态,需要在每个事务中添加两条信息:

  1. 创建交易的时间(以确保交易的严格总排序),以及

  2. 状态——交易是否成功。

还需要有两个视图——一个返回账户的可用余额(即所有“成功”交易的总和),另一个返回最旧的“待处理”交易:

POST /transactions/balance-available
{
    "map": function(txn) {
        if (txn.status == "successful") {
            emit(txn.from, txn.amount * -1);
            emit(txn.to, txn.amount);
        }
    },
    "reduce": function(keys, values) {
        return sum(values);
    }
}

POST /transactions/oldest-pending
{
    "map": function(txn) {
        if (txn.status == "pending") {
            emit(txn._id, txn);
        }
    },
    "reduce": function(keys, values) {
        var oldest = values[0];
        values.forEach(function(txn) {
            if (txn.timestamp < oldest) {
                oldest = txn;
            }
        });
        return oldest;
    }

}

转移列表现在可能如下所示:

{"from": "Alex", "to": "Dave", "amount": 100, "timestamp": 50, "status": "successful"}
{"from": "Dave", "to": "Jane", "amount": 200, "timestamp": 60, "status": "pending"}

接下来,应用程序将需要一个函数,该函数可以通过检查每个待处理的交易来解决交易,以验证它是否有效,然后将其状态从“待处理”更新为“成功”或“拒绝”:

def resolve_transactions(target_timestamp):
    """ Resolves all transactions up to and including the transaction
        with timestamp `target_timestamp`. """
    while True:
        # Get the oldest transaction which is still pending
        txn = db.get("transactions/oldest-pending")
        if txn.timestamp > target_timestamp:
            # Stop once all of the transactions up until the one we're
            # interested in have been resolved.
            break

        # Then check to see if that transaction is valid
        if db.get("transactions/available-balance", id=txn.from) >= txn.amount:
            status = "successful"
        else:
            status = "rejected"

        # Then update the status of that transaction. Note that CouchDB
        # will check the "_rev" field, only performing the update if the
        # transaction hasn't already been updated.
        txn.status = status
        couch.put(txn)

最后,正确执行传输的应用程序代码:

def transfer(from_acct, to_acct, amount):
    timestamp = time.time()
    txn = db.post("transactions", {
        "from": from_acct,
        "to": to_acct,
        "amount": amount,
        "status": "pending",
        "timestamp": timestamp,
    })
    resolve_transactions(timestamp)
    txn = couch.get("transactions/" + txn._id)
    if txn_status == "rejected":
        raise InsufficientFunds()

几点注意事项:

  • 为简洁起见,这个特定的实现假设了 CouchDB 的 map-reduce 中的一些原子性。更新代码使其不依赖于该假设作为练习留给读者。

  • 没有考虑主/主复制或 CouchDB 的文档同步。主/主复制和同步使这个问题变得更加困难。

  • 在实际系统中,使用time()可能会导致冲突,因此使用具有更多熵的东西可能是个好主意;也许,或者在排序中"%s-%s" %(time(), uuid())使用文档。_id包括时间并不是绝对必要的,但如果多个请求大约同时进入,它有助于保持逻辑。

于 2014-03-07T08:01:28.877 回答
1

BerkeleyDB 和 LMDB 都是支持 ACID 事务的键值对存储。在 BDB 中,txns 是可选的,而 LMDB 仅以事务方式运行。

于 2014-08-05T00:51:05.170 回答
1

反对它们的一个典型论点是它们通常不允许跨多行或多表的原子事务。我想知道是否有一种通用方法可以解决这个问题。

许多现代数据存储不支持开箱即用的原子多键更新(事务),但它们中的大多数提供了允许您构建 ACID 客户端事务的原语。

如果数据存储支持每个键的线性化和比较和交换或测试和设置操作,那么实现可序列化事务就足够了。例如,这种方法用于Google 的 PercolatorCockroachDB数据库。

在我的博客中,我创建了可序列化跨分片客户端事务的逐步可视化,描述了主要用例并提供了算法变体的链接。我希望它能帮助您了解如何为您的数据存储实现它们。

支持每个键线性化和 CAS 的数据存储包括:

  • 具有轻量级事务的 Cassandra
  • 具有一致桶的 Riak
  • 重新思考数据库
  • 动物园管理员
  • 其他
  • HBase
  • 动态数据库
  • MongoDB

顺便说一句,如果您对 Read Committed 隔离级别没问题,那么看看Peter Bailis的RAMP 事务是有意义的。它们也可以针对同一组数据存储实现。

于 2016-03-04T06:46:24.913 回答