为了更多地了解 Golang,我正在尝试从在线 5v5 视频游戏中获取比赛数据。
根据给定的玩家,我将请求他的匹配列表,并根据此匹配列表中的游戏 ID,我请求每个单独匹配的元数据。对这些比赛的所有参与者都应该这样做。
在工人范式术语中:从事 1 个工作可以创建多达 9 个额外的工作。
在这样做的过程中,我并行化这些调用工人 goroutine 的努力的方式在告诉工人何时停止方面似乎是不可靠的。
为此,我使用了一个终止标准,并让一个 goroutine 定期检查它。如果满足标准(例如 100 场比赛,或 10 名玩家从其中获取所有比赛),则应转发取消信号,告诉工人停止。
问题是,在当前的实施中,停止是非常不可靠的。通常,我最终会得到比定义更多的比赛或球员。在其他情况下,工作例程将简单地休眠并且永远不会完成。
当满足终止条件而不会超调时,如何实现停止所有 goroutine 的可靠方法?
“QueuePlayers”将所有接收到的参与者存储到一个数据结构中,告诉其他 goroutine 从哪个玩家抓取游戏,并定期检查是否满足终止标准。
“Crawl Players”获取给定玩家的匹配列表,获取其所有匹配并转发每个匹配“QueuePlayers”的参与者。在处理完一个玩家之后,表示它已经准备好让下一个玩家开始工作了。
func (c *Crawler) Start() {
playerChan := make(chan string, c.concurrency)
participants := make(chan []string, c.concurrency)
ready := make(chan struct{}{}, c.concurrency)
defer close(ready)
defer close(playerChan)
defer close(participants)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sp, err := c.GetPlayerByName(c.startPlayer)
if err != nil {
log.Infof("Erronous Start Player given!")
return
}
// spawn worker and do work
for i := 0; i < c.concurrency; i++ {
go c.CrawlPlayer(ctx, i+1, playerChan, ready, participants, &wg)
wg.Add(1)
}
// Store Participants and queue next Players to crawl Matches from.
// Keep track of the number of matches and players fetched and terminate everything
// when the termination criterion has been met
go c.QueuePlayers(ctx, cancel, ready, participants, playerChan, &wg)
c.store.AddToQueue(sp.Puuid)
ready <- struct{}{}
wg.Wait()
log.Infof("Finished")
}
func (c *Crawler) QueuePlayers(ctx context.Context, cancel func(), ready <-chan Void, participants chan []string, player chan<- string, wg *sync.WaitGroup) {
for {
select {
case p := <-participants:
for _, player := range p {
if c.store.IsPlayerKnown(player) {
log.Infof("Player %s already known", player)
continue
}
c.store.AddToQueue(player)
log.Infof("New Player %s pushed into Queue", player)
}
case <-ready:
// Termination Crteria
if !(c.store.NumMatches() >= c.totalNumberOfMatches || c.store.NumPlayers() >= c.TotalNumberOfPlayers) {
// In case the other workers are idling, specifically at the very beginning where only one player can be worked on
for i := 0; i < (c.concurrency - len(c.playerChan)); i++ {
next, err := c.store.NextPlayer()
if err != nil {
log.Error(err)
continue
}
player <- next
log.Infof("Next player in line: %s", next)
}
} else {
cancel()
return
}
}
}
}
func (c *Crawler) CrawlPlayer(ctx context.Context, workerID int, playerChan <-chan string, ready chan<- Void, participants chan<- []string, wg *sync.WaitGroup) {
log.Printf("[WorkerID:%v]:Goroutine started", workerID)
OUTER:
for {
select {
case <-ctx.Done():
log.Printf("[WorkerID:%v]: Goroutine finished", workerID)
wg.Done()
return
case player := <-playerChan:
ml, err := c.GetMatchList(player)
if err != nil {
log.Infof("Error occured for Player", player)
continue
}
// 2. Process each match
INNER:
for _, m := range *ml {
// Termination Criteria
if !(c.store.NumMatches() >= c.totalNumberOfMatches || c.store.NumPlayers() >= c.TotalNumberOfPlayers) {
match, err := c.GetMatch(m)
if err != nil {
log.Errorf("Error fetching match %v", m)
continue INNER
}
// Handle Match
c.dbm.InsertMatch(*match)
c.store.ConfirmMatch(match.MetaData.MatchID)
participants <- match.MetaData.Participants
} else {
ready <- struct{}{}
continue OUTER
}
}
// Finish up current player and get next player zu process
summoner, err := c.GetPlayerByID(player)
c.dbm.InsertPlayer(*summoner)
c.store.ConfirmPlayer(player)
ready <- struct{}{}
}
}
}
为了定期检查终止标准,我尝试设置一个线程安全结构,该结构需要一个映射并将其添加为 Crawler 结构的成员。
type CacheType struct {
mux sync.RWMutex
Cache map[string]struct{}
}
type Store struct {
Match *CacheType
PlayerKnown *CacheType
mux sync.Mutex
PlayerQueue *list.List
}
// AddToQueue inserts a player at the back of the queue of players to process
func (s *Store) AddToQueue(player string) {
s.mux.Lock()
defer s.mux.Unlock()
s.PlayerQueue.PushBack(player)
}
func (s *Store) NumMatches() int {
s.mux.Lock()
defer s.mux.Unlock()
return len(s.Match.Cache)
}
// NumPlayers returns the players of Matches crawled
func (s *Store) NumPlayers() int {
s.mux.Lock()
defer s.mux.Unlock()
return len(s.PlayerKnown.Cache)
}
// ConfirmMatch inserts GameId into Sink
func (s *Store) ConfirmMatch(id string) {
s.mux.Lock()
defer s.mux.Unlock()
s.Match.Cache[id] = struct{}{}
}
// ConfirmPlayer inserts IDs of players into SInk
func (s *Store) ConfirmPlayer(id string) {
s.mux.Lock()
defer s.mux.Unlock()
s.PlayerKnown.Cache[id] = struct{}{}
}
// MatchExists checks if a match had been inserted to the sink
func (s *Store) MatchExists(id string) bool {
s.mux.Lock()
defer s.mux.Unlock()
_, ok := s.Match.Cache[id]
return ok
}
// IsPlayerKnown checks if a match had been inserted to the sink
func (s *Store) IsPlayerKnown(id string) bool {
s.mux.Lock()
defer s.mux.Unlock()
_, ok := s.PlayerKnown.Cache[id]
return ok
}