问题:
- 我写了一个
Sell()
函数来减少用户购物车中清单商品的库存。假设这些商品在一秒钟内会有数千个请求。由于我们需要提高吞吐量,并确保服务不会因压力或网络错误等而关闭。我们将库存服务部署在多个服务器中。这里我们有一个用于库存服务的数据库。
- 我们需要交易,因为我们有一份正在处理的货物清单,如果其中一件货物未能减少库存,我们必须回滚。
- 我们需要一个Redis 分布式锁来确保数据的一致性。一次只有一个线程/例程会访问/更改特定商品的信息。
由于我们需要将循环放在和之间start transaction
,commit()
以确保事务的属性。我们必须在事务之前锁定并在提交之后解锁。但这会使事务序列化。我正在尝试使用细粒度的 Redis 锁,基本上每个商品都使用其 ID 生成锁。显然我的代码是错误的。我知道可能还有其他选项,例如乐观锁悲观锁。但是我如何用 Redis 锁来实现这个想法呢?
我将其简化如下:
// assume no lock or unlock error, stock is enough, goodsID exist
tx := DB.Begin() // begin transaction
loop a list of goods in shop cart
for _, good = range [] goods {
mutex := GetRedisLock(good.ID) // for each item, get a distributed Redis lock by its name
mutex.lock()
inv := DB.Getby(goodID) // get inventory by goodID
inv.Stock -= good.ReductionNum // do stock reduction for this item
tx.Save(inv) // save current value to database
mutex.Unlock()
}
tx.Commit()
go中的演示代码:
func (*InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client)
rs := redsync.New(pool)
// start transaction
tx := global.DB.Begin()
for _, goodInfo := range req.GoodsInfo {
var inv model.Inventory
// create a new distributed lock for each goodID which needs to do reduction
mutex := rs.NewMutex(fmt.Sprintf("goodsID:%d", goodInfo.GoodsId))
if err := mutex.Lock(); err != nil {
return nil, status.Errorf(codes.Internal, "get redis lock error")
}
if result := global.DB.Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {
tx.Rollback()
return nil, status.Errorf(codes.NotFound, "no stock info")
}
if inv.Stocks < goodInfo.Num {
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "not enough stock")
}
inv.Stocks -= goodInfo.Num
tx.Save(&inv)
if ok, err := mutex.Unlock(); !ok || err != nil {
return nil, status.Errorf(codes.Internal, "release redis lock error")
}
}
tx.Commit()
return &emptypb.Empty{}, nil
}