3

我是 Go 和 Go 并发的新手。一旦找到具有给定 ID 的成员,我正在尝试使用 Go 上下文来取消一组 Go 例程。

一个组存储一个客户列表,每个客户都有一个成员列表。我想并行搜索所有客户及其所有成员,以找到具有给定 ID 的成员。一旦找到此成员,我想取消所有其他 Go 例程并返回发现的成员。

我尝试了以下实现,使用 context.WithCancel 和 WaitGroup。

然而,这不起作用,并且无限期地挂起,永远不会越过waitGroup.Wait(),但我不确定为什么。

func (group *Group) MemberWithID(ID string) (*models.Member, error) {
    found := make(chan *models.Member)
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    var waitGroup sync.WaitGroup

    for _, client := range group.Clients {
        waitGroup.Add(1)

        go func(clientToQuery Client) {
            defer waitGroup.Done()

            select {
            case <-ctx.Done():
                return
            default:
            }

            member, _ := client.ClientMemberWithID(ID)
            if member != nil {
                found <- member
                cancel()
                return
            }

        } (client)

    }

    waitGroup.Wait()

    if len(found) > 0 {
        return <-found, nil
    }

    return nil, fmt.Errorf("no member found with given id")
}
4

2 回答 2

4

found是一个无缓冲的通道,所以在它上面发送会阻塞,直到有人准备好接收它。

您的main()函数将是从中接收的函数,但仅在waitGroup.Wait()返回之后。但这将阻塞,直到所有启动的 goroutine 调用waitGroup.Done(). 但这在他们返回之前不会发生,直到他们可以发送时才会发生found。这是一个僵局。

如果您更改found为缓冲,即使main()尚未准备好从中接收值(与缓冲区一样多的值),也将允许在其上发送值。

但是你应该收到found之前的waitGroup.Wait()回报。

另一种解决方案是使用 1 的缓冲区found,并使用非阻塞发送found。这样,第一个(最快的)goroutine 将能够发送结果,其余的(假设我们使用的是非阻塞发送)将简单地跳过发送。

另请注意,它应该是main()that 调用cancel(),而不是每个单独启动的 goroutine。

于 2021-09-02T13:25:35.140 回答
4

对于这种用例,我认为 async.Once可能比通道更适合。当你找到第一个非零成员时,你想做两件不同的事情:

  1. 记录您找到的成员。
  2. 取消剩余的 goroutine。

缓冲通道可以轻松完成 (1),但会使 (2) 更复杂一些。但是 async.Once非常适合在第一次发生有趣的事情时做两件不同的事情!


我还建议汇总重要错误,以便您可以报告比no member found数据库连接失败或发生其他重要错误更有用的信息。你也可以使用 a sync.Once


把它们放在一起,我想看到这样的东西(https://play.golang.org/p/QZXUUnbxOv5):

func (group *Group) MemberWithID(ctx context.Context, id string) (*Member, error) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    var (
        wg sync.WaitGroup

        member    *Member
        foundOnce sync.Once

        firstNontrivialErr error
        errOnce            sync.Once
    )

    for _, client := range group.Clients {
        wg.Add(1)
        client := client // https://golang.org/doc/faq#closures_and_goroutines
        go func() {
            defer wg.Done()

            m, err := client.ClientMemberWithID(ctx, id)
            if m != nil {
                foundOnce.Do(func() {
                    member = m
                    cancel()
                })
            } else if nf := (*MemberNotFoundError)(nil); !errors.As(err, &nf) {
                errOnce.Do(func() {
                    firstNontrivialErr = err
                })
            }
        }()
    }
    wg.Wait()

    if member == nil {
        if firstNontrivialErr != nil {
            return nil, firstNontrivialErr
        }
        return nil, &MemberNotFoundError{ID: id}
    }
    return member, nil
}
于 2021-09-02T14:06:21.330 回答