0

为了更多地了解 Golang,我正在尝试从在线 5v5 视频游戏中获取比赛数据。

根据给定的玩家,我将请求他的匹配列表,并根据此匹配列表中的游戏 ID,我请求每个单独匹配的元数据。对这些比赛的所有参与者都应该这样做。

在工人范式术语中:从事 1 个工作可以创建多达 9 个额外的工作。

在这样做的过程中,我并行化这些调用工人 goroutine 的努力的方式在告诉工人何时停止方面似乎是不可靠的。

为此,我使用了一个终止标准,并让一个 goroutine 定期检查它。如果满足标准(例如 100 场比赛,或 10 名玩家从其中获取所有比赛),则应转发取消信号,告诉工人停止。

问题是,在当前的实施中,停止是非常不可靠的。通常,我最终会得到比定义更多的比赛或球员。在其他情况下,工作例程将简单地休眠并且永远不会完成。

当满足终止条件而不会超调时,如何实现停止所有 goroutine 的可靠方法?


“QueuePlayers”将所有接收到的参与者存储到一个数据结构中,告诉其他 goroutine 从哪个玩家抓取游戏,并定期检查是否满足终止标准。

“Crawl Players”获取给定玩家的匹配列表,获取其所有匹配并转发每个匹配“QueuePlayers”的参与者。在处理完一个玩家之后,表示它已经准备好让下一个玩家开始工作了。

func (c *Crawler) Start() {
    playerChan := make(chan string, c.concurrency)
    participants := make(chan []string, c.concurrency)
    ready := make(chan struct{}{}, c.concurrency)
    defer close(ready)
    defer close(playerChan)
    defer close(participants)
    var wg sync.WaitGroup

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sp, err := c.GetPlayerByName(c.startPlayer)
    if err != nil {
        log.Infof("Erronous Start Player given!")
        return
    }

    // spawn worker and do work
    for i := 0; i < c.concurrency; i++ {
        go c.CrawlPlayer(ctx, i+1, playerChan, ready, participants, &wg)
        wg.Add(1)
    }

    // Store Participants and queue next Players to crawl Matches from.
    // Keep track of the number of matches and players fetched and terminate everything
    // when the termination criterion has been met
    go c.QueuePlayers(ctx, cancel, ready, participants, playerChan, &wg)

    c.store.AddToQueue(sp.Puuid)
    ready <- struct{}{}

    wg.Wait()
    log.Infof("Finished")
}




 func (c *Crawler) QueuePlayers(ctx context.Context, cancel func(), ready <-chan Void, participants chan []string, player chan<- string, wg *sync.WaitGroup) {
        for {
            select {
            case p := <-participants:
                for _, player := range p {
                    if c.store.IsPlayerKnown(player) {
                        log.Infof("Player %s already known", player)
                        continue
                    }
                    c.store.AddToQueue(player)
                    log.Infof("New Player %s pushed into Queue", player)
                }
            case <-ready:
                // Termination Crteria
                if !(c.store.NumMatches() >= c.totalNumberOfMatches || c.store.NumPlayers() >= c.TotalNumberOfPlayers) {
                    // In case the other workers are idling, specifically at the very beginning where only one player can be worked on
                    for i := 0; i < (c.concurrency - len(c.playerChan)); i++ {
                        next, err := c.store.NextPlayer()
                        if err != nil {
                            log.Error(err)
                            continue
                        }
                        player <- next
                        log.Infof("Next player in line: %s", next)
                    }
                } else {
                    cancel()
                    return
                }
            }
        }
    }

    
func (c *Crawler) CrawlPlayer(ctx context.Context, workerID int, playerChan <-chan string, ready chan<- Void, participants chan<- []string, wg *sync.WaitGroup) {
    log.Printf("[WorkerID:%v]:Goroutine started", workerID)
OUTER:
    for {
        select {
        case <-ctx.Done():
            log.Printf("[WorkerID:%v]: Goroutine finished", workerID)
            wg.Done()
            return
        case player := <-playerChan:
            ml, err := c.GetMatchList(player)
            if err != nil {
                log.Infof("Error occured for Player", player)
                continue
            }
            // 2. Process each match
        INNER:
            for _, m := range *ml {
                // Termination Criteria
                if !(c.store.NumMatches() >= c.totalNumberOfMatches || c.store.NumPlayers() >= c.TotalNumberOfPlayers)  {
                    match, err := c.GetMatch(m)
                    if err != nil {
                        log.Errorf("Error fetching match %v", m)
                        continue INNER
                    }
                    // Handle Match
                    c.dbm.InsertMatch(*match)
                    c.store.ConfirmMatch(match.MetaData.MatchID)
                    participants <- match.MetaData.Participants
                } else {
                    ready <- struct{}{}
                    continue OUTER
                }
            }
            // Finish up current player and get next player zu process
            summoner, err := c.GetPlayerByID(player)
            c.dbm.InsertPlayer(*summoner)
            c.store.ConfirmPlayer(player)
            ready <- struct{}{}
        }
    }
}

为了定期检查终止标准,我尝试设置一个线程安全结构,该结构需要一个映射并将其添加为 Crawler 结构的成员。

type CacheType struct {
    mux   sync.RWMutex
    Cache map[string]struct{}
}

type Store struct {
    Match       *CacheType
    PlayerKnown *CacheType
    mux         sync.Mutex
    PlayerQueue *list.List
}


// AddToQueue inserts a player at the back of the queue of players to process
func (s *Store) AddToQueue(player string) {
    s.mux.Lock()
    defer s.mux.Unlock()
    s.PlayerQueue.PushBack(player)
}


func (s *Store) NumMatches() int {
    s.mux.Lock()
    defer s.mux.Unlock()
    return len(s.Match.Cache)
}

// NumPlayers returns the players of Matches crawled
func (s *Store) NumPlayers() int {
    s.mux.Lock()
    defer s.mux.Unlock()
    return len(s.PlayerKnown.Cache)
}

// ConfirmMatch inserts GameId into Sink
func (s *Store) ConfirmMatch(id string) {
    s.mux.Lock()
    defer s.mux.Unlock()
    s.Match.Cache[id] = struct{}{}
}

// ConfirmPlayer inserts IDs of players into SInk
func (s *Store) ConfirmPlayer(id string) {
    s.mux.Lock()
    defer s.mux.Unlock()
    s.PlayerKnown.Cache[id] = struct{}{}
}

// MatchExists checks if a match had been inserted to the sink
func (s *Store) MatchExists(id string) bool {
    s.mux.Lock()
    defer s.mux.Unlock()
    _, ok := s.Match.Cache[id]
    return ok
}

// IsPlayerKnown checks if a match had been inserted to the sink
func (s *Store) IsPlayerKnown(id string) bool {
    s.mux.Lock()
    defer s.mux.Unlock()
    _, ok := s.PlayerKnown.Cache[id]
    return ok
}
4

0 回答 0