提供一个具体的例子(因为网上缺乏正确的例子令人惊讶):这里是如何在 CouchDB 中实现“原子银行余额转移”(主要复制自我关于同一主题的博客文章:http://blog.codekills .net/2014/03/13/atomic-bank-balance-transfer-with-couchdb/)
首先,简要回顾一下这个问题:如何设计一个允许在账户之间转移资金的银行系统,以便不存在可能留下无效或无意义余额的竞争条件?
这个问题有几个部分:
第一:事务日志。不是将账户余额存储在单个记录或文档中——{"account": "Dave", "balance": 100}
而是通过汇总该账户的所有贷方和借方来计算账户余额。这些贷方和借方存储在事务日志中,可能如下所示:
{"from": "Dave", "to": "Alex", "amount": 50}
{"from": "Alex", "to": "Jane", "amount": 25}
用于计算余额的 CouchDB map-reduce 函数可能如下所示:
POST /transactions/balances
{
"map": function(txn) {
emit(txn.from, txn.amount * -1);
emit(txn.to, txn.amount);
},
"reduce": function(keys, values) {
return sum(values);
}
}
为了完整起见,以下是余额列表:
GET /transactions/balances
{
"rows": [
{
"key" : "Alex",
"value" : 25
},
{
"key" : "Dave",
"value" : -50
},
{
"key" : "Jane",
"value" : 25
}
],
...
}
但这留下了一个明显的问题:错误是如何处理的?如果有人试图进行大于余额的转账会发生什么?
对于 CouchDB(和类似的数据库),这种业务逻辑和错误处理必须在应用程序级别实现。天真地,这样的函数可能看起来像这样:
def transfer(from_acct, to_acct, amount):
txn_id = db.post("transactions", {"from": from_acct, "to": to_acct, "amount": amount})
if db.get("transactions/balances") < 0:
db.delete("transactions/" + txn_id)
raise InsufficientFunds()
但是请注意,如果应用程序在插入交易和检查更新的余额之间崩溃,数据库将处于不一致的状态:发送方可能会留下负余额,而接收方可能会收到以前不存在的钱:
// Initial balances: Alex: 25, Jane: 25
db.post("transactions", {"from": "Alex", "To": "Jane", "amount": 50}
// Current balances: Alex: -25, Jane: 75
如何解决这个问题?
为了确保系统永远不会处于不一致状态,需要在每个事务中添加两条信息:
创建交易的时间(以确保交易的严格总排序),以及
状态——交易是否成功。
还需要有两个视图——一个返回账户的可用余额(即所有“成功”交易的总和),另一个返回最旧的“待处理”交易:
POST /transactions/balance-available
{
"map": function(txn) {
if (txn.status == "successful") {
emit(txn.from, txn.amount * -1);
emit(txn.to, txn.amount);
}
},
"reduce": function(keys, values) {
return sum(values);
}
}
POST /transactions/oldest-pending
{
"map": function(txn) {
if (txn.status == "pending") {
emit(txn._id, txn);
}
},
"reduce": function(keys, values) {
var oldest = values[0];
values.forEach(function(txn) {
if (txn.timestamp < oldest) {
oldest = txn;
}
});
return oldest;
}
}
转移列表现在可能如下所示:
{"from": "Alex", "to": "Dave", "amount": 100, "timestamp": 50, "status": "successful"}
{"from": "Dave", "to": "Jane", "amount": 200, "timestamp": 60, "status": "pending"}
接下来,应用程序将需要一个函数,该函数可以通过检查每个待处理的交易来解决交易,以验证它是否有效,然后将其状态从“待处理”更新为“成功”或“拒绝”:
def resolve_transactions(target_timestamp):
""" Resolves all transactions up to and including the transaction
with timestamp `target_timestamp`. """
while True:
# Get the oldest transaction which is still pending
txn = db.get("transactions/oldest-pending")
if txn.timestamp > target_timestamp:
# Stop once all of the transactions up until the one we're
# interested in have been resolved.
break
# Then check to see if that transaction is valid
if db.get("transactions/available-balance", id=txn.from) >= txn.amount:
status = "successful"
else:
status = "rejected"
# Then update the status of that transaction. Note that CouchDB
# will check the "_rev" field, only performing the update if the
# transaction hasn't already been updated.
txn.status = status
couch.put(txn)
最后,正确执行传输的应用程序代码:
def transfer(from_acct, to_acct, amount):
timestamp = time.time()
txn = db.post("transactions", {
"from": from_acct,
"to": to_acct,
"amount": amount,
"status": "pending",
"timestamp": timestamp,
})
resolve_transactions(timestamp)
txn = couch.get("transactions/" + txn._id)
if txn_status == "rejected":
raise InsufficientFunds()
几点注意事项:
为简洁起见,这个特定的实现假设了 CouchDB 的 map-reduce 中的一些原子性。更新代码使其不依赖于该假设作为练习留给读者。
没有考虑主/主复制或 CouchDB 的文档同步。主/主复制和同步使这个问题变得更加困难。
在实际系统中,使用time()
可能会导致冲突,因此使用具有更多熵的东西可能是个好主意;也许,或者在排序中"%s-%s"
%(time(), uuid())
使用文档。_id
包括时间并不是绝对必要的,但如果多个请求大约同时进入,它有助于保持逻辑。