0

我有一个依赖于同时检查一些错误的函数,我正在尝试使用等待组来等待所有返回可能错误的进程都完成,然后再检查所有错误。

它似乎正在跳过一些 wg.Done() cals。这是调试的 youtube 视频(它循环了 3 次“for”循环,抱歉): Golang Delve Debug for WaitGroups

知道为什么它会跳过一些 waitgroup.Done() 调用吗?

这是代码:

package controllers

import (
    "errors"
    "mobilebid/billable"
    db "mobilebid/database"
    "mobilebid/stripe"
    "net/http"
    "os"
    "strconv"
    "sync"
    "time"

    log "github.com/Sirupsen/logrus"
    "github.com/gorilla/mux"
)

var (
    errBillableID     = errors.New("It looks like there was an error while getting your billable ID. Do you have a credit card set up?")
    errWinningItems   = errors.New("It looks like there was an error while gathering your winning items. Please contact an event rep.")
    errAcctInfo       = errors.New("We had some trouble getting the account information for the event. Please contact an event rep.")
    errLoggingTrans   = errors.New("It looks like we had some sort of issue while logging your transaction. Please contact an event rep.")
    errParsingURL     = errors.New("We had some issue looking at the URL.")
    errStripeIssue    = errors.New("It looks like there was some kind of issue while talking with Stripe. If you were in the middle of a transaction, this doesn't mean the transaction was cancelled. Take a look at your transactions and/or contact an event rep.")
    errItemsPurchased = errors.New("One or more of the items you're trying to purchase have already been purchased. If this doesn't sound right, please contact an event rep.")
)

func createLogCtx(bidderID, eventID int) *log.Entry {
    return log.WithFields(log.Fields{
        "bidderID": bidderID,
        "eventID":  eventID,
    })
}

var wg sync.WaitGroup

const gorutineCt = 6

//PurchaseItems purchases items from the event for the bidder and sends the funds to the customer
//  In order for PurchaseItems to work:
//      1. Bidder must have a customer account set up in Stripe
//      2. Event owner needs to have their Stripe registered with the apps Stripe account
//      3. Item must not have been purchased before (ever)
func PurchaseItems(dB db.AppDB) http.HandlerFunc {
    return http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {

        ps := mux.Vars(req)

        eventID, err := strconv.Atoi(ps["eventID"])
        if err != nil {
            log.Error(err.Error())
            res.Write(ResErr(errParsingURL.Error()))
            return
        }

        bidderID, err := strconv.Atoi(ps["bidderID"])
        if err != nil {
            log.Error(err.Error())
            res.Write(ResErr(errParsingURL.Error()))
            return
        }

        itemsChan := make(chan []db.ItemWon)
        billableBidderIDChan := make(chan string)
        creditableAcctChan := make(chan string)
        errsChan := make(chan error, gorutineCt)
        wg.Add(gorutineCt)

        logCtx := createLogCtx(bidderID, eventID)

        acct := stripe.New(os.Getenv("SECRET_KEY"), os.Getenv("PUBLISHABLE_KEY"))

        go func() {
            id, e := dB.GetBidderBillableID(bidderID)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errBillableID
                billableBidderIDChan <- id
            } else {
                errsChan <- nil
                billableBidderIDChan <- id
            }
            wg.Done()
        }()

        go func() {
            i, e := dB.GetWinningItemsForBidder(bidderID, eventID)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errWinningItems
                itemsChan <- i
            } else {
                errsChan <- nil
                itemsChan <- i
            }
            wg.Done()
        }()

        go func() {
            a, e := dB.GetCreditableAccountFromEvent(eventID)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errAcctInfo
                creditableAcctChan <- a
            } else {
                errsChan <- nil
                creditableAcctChan <- a
            }
            wg.Done()
        }()

        go func() {
            items := <-itemsChan
            for _, val := range items {
                e := dB.CheckIfItemPurchased(val.ItemID)
                if e != nil {
                    logCtx.WithFields(log.Fields{
                        "itemID":     val.ItemID,
                        "_timestamp": time.Now(),
                    }).Error(e.Error())
                    errsChan <- errItemsPurchased
                    itemsChan <- items
                    wg.Done()
                    return
                }
            }
            errsChan <- nil
            itemsChan <- items
            wg.Done() //SKIPPED
        }()

        go func() {
            billableBidderID := <-billableBidderIDChan
            e := acct.BuyerIsBillable(billableBidderID)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errStripeIssue
                billableBidderIDChan <- billableBidderID
            } else {
                errsChan <- nil
                billableBidderIDChan <- billableBidderID
            }
            wg.Done()
        }()

        go func() {
            creditableAcct := <-creditableAcctChan
            e := acct.CanReceiveFunds(creditableAcct)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errStripeIssue
                creditableAcctChan <- creditableAcct
            } else {
                errsChan <- nil
                creditableAcctChan <- creditableAcct
            }
            wg.Done()
        }()

        wg.Wait()
        close(errsChan)

        if err = checkConcurrentErrs(errsChan); err != nil {
            logCtx.Error(err.Error())
            res.Write(ResErr(err.Error()))
            return
        }

        items := <-itemsChan
        amount := addItems(items)
        appFee := calculateFee(amount, .03) //TODO: Store this somewhere where it can be edited without having to restart the app.

        invoice := billable.BillObject{
            Desc:     "Test Charge", //TODO: Generate this description from the event, items and bidder somehow.
            Amount:   amount,
            Currency: "usd",
            Dest:     <-creditableAcctChan,
            Fee:      appFee,
            Meta:     createItemsList(items),
            Customer: <-billableBidderIDChan,
        }

        trans, err := acct.ChargeBidder(invoice)
        if err != nil {
            logCtx.Error(err.Error())
            res.Write(ResErr(errStripeIssue.Error()))
            return
        }

        logCtx.WithFields(log.Fields{
            "stripeTransID": trans.TransID,
            "itemcCount":    len(items),
        }).Info("Transferred funds from bidder to client")

        dbTrans := db.Transaction{
            TransID:  trans.TransID,
            UserID:   5,
            BidderID: bidderID,
            EventID:  eventID,
            Amount:   int64(amount),
            AppFee:   int64(appFee),
            Desc:     "Some test order",
            Status:   "completed",
        }

        orderID, err := dB.InsertTransaction(dbTrans)
        if err != nil {
            logCtx.WithFields(log.Fields{
                "stripeTransID": dbTrans.TransID,
                "_timestamp":    time.Now(),
            }).Error(err.Error())
            res.Write(ResErr(errLoggingTrans.Error()))
            return
        }

        for it, val := range items {
            i := db.TransactionLine{
                OrderID: orderID,
                ItemID:  val.ItemID,
                Amount:  uint64(val.Bid * 100), //Must do this since the bid is in dollars but the amount is pennies
                Line:    it,
            }

            err := dB.InsertTransactionLine(i)
            if err != nil {
                logCtx.WithFields(log.Fields{
                    "stripeTransID": dbTrans.TransID,
                    "lineNumber":    i,
                    "_timestamp":    time.Now(),
                }).Error(err.Error())
                res.Write(ResErr(errLoggingTrans.Error()))
                return
            }
        }

        logCtx.WithField("orderID", orderID).Info("Order created")

        //TODO: Send receipt to buyer.
        res.Write(ResOK(trans.TransID))

    })
}
4

1 回答 1

4

为了后代(和谷歌搜索):

放在wg.Add(1)每一go func()行之前,而不是一次性完成wg.Add(gorutineCt)

放在defer wg.Done()每个 goroutine 外壳的开头,而不是wg.Done()在每个退出案例中调用。这确保wg.Done()无论如何都可以运行。

使用更接近的例程,而不是尝试充分缓冲通道:

// start other goroutines

go func () {
    wg.Wait()
    close(errschan)
}

for _, err := range errsChan { // automatically terminates once chan is closed
    if err != nil {
         // handle err
    }
}
于 2016-06-21T15:56:22.110 回答