0

我正在尝试在 mongodb 中构建事务,其重试功能类似于 nodejs 等其他驱动程序。这是我当前的实现

      if session, err = client.StartSession(); err != nil {
        return err
      }
      if err = session.StartTransaction(); err != nil {
        return err
      }
      if err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {
      if result, err = collection.UpdateOne(sc, bson.M{"_id": id}, update); err != nil {
        _ =  session.AbortTransaction(sc)
        return err
      }
      if result.MatchedCount != 1 || result.ModifiedCount != 1 {
        _ =  session.AbortTransaction(sc)
        return error.New("no match")
      }


      if err = session.CommitTransaction(sc); err != nil {
        _ =  session.AbortTransaction(sc)
      }
      return nil
    }); err != nil {
      // what needs to be handled here?
      // If its a particular error type can i retry transaction here?
      // should i abort transaction here?
      return err
    }
    session.EndSession(ctx)

如果它是一个特定的错误类型,我该如何重试事务?

此外,我们是否需要在每次提交失败或返回错误时自动取消事务时中止事务?

我找不到很多关于如何实施这项权利的例子

4

1 回答 1

0

在做了一些研究之后,我可以提出一个类似于其他 mongo 驱动程序的示例。

func trans() error {
        transactionCodeFunc := func(sctx mongo.SessionContext) error {

            err := sctx.StartTransaction(options.Transaction().
                SetReadConcern(readconcern.Snapshot()).
                SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
            )
            if err != nil {
                return err
            }
            // Transaction - Attempt an stripe charge
            err = empService.Update(sctx, filter, update)
            if err != nil {
                sctx.AbortTransaction(sctx)
                return err
            }
            // Transaction - Attempt an stripe charge
            err = empService.Update(sctx, filter2, update2)
            if err != nil {
                sctx.AbortTransaction(sctx)
                return err
            }
            return commitWithRetry(sctx)
    }
        return db.Client().UseSessionWithOptions(
            ctx, options.Session().SetDefaultReadPreference(readpref.Primary()),
            func(sctx mongo.SessionContext) error {
                return runTransactionWithRetry(sctx, transactionCodeFunc)
            },
        )
}

    func runTransactionWithRetry(sctx mongo.SessionContext, txnFn 
      func(mongo.SessionContext) error) error {
        for {
            err := txnFn(sctx) // Performs transaction.
            if err == nil {
                return nil
            }

            log.Println("Transaction aborted. Caught exception during transaction.")

            // If transient error, retry the whole transaction
            if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
                log.Println("TransientTransactionError, retrying transaction...")
                continue
            }
            return err
        }
    }

    func commitWithRetry(sctx mongo.SessionContext) error {
        for {
            err := sctx.CommitTransaction(sctx)
            switch e := err.(type) {
            case nil:
                fmt.Println("Transaction comitted")
                return nil
            case mongo.CommandError:
                if e.HasErrorLabel("UnknownTransactionCommitResult") {
                    log.Println("UnknownTransactionCommitResult, retrying commit operation")
                    continue
                }
                log.Println("Error during commit...")
                return e
            default:
                log.Println("Error during commit...")
                return e
            }
        }
    }
于 2019-09-13T01:12:39.917 回答