0

我有一个包含数千个 ID 的通道,需要在 goroutines 中并行处理。我如何实现一个锁,以便 goroutines 不能同时处理相同的 id,是否应该在通道中重复?

package main

import (
    "fmt"
    "sync"
    "strconv"
    "time"
)

var wg sync.WaitGroup

func main() {
    var data []string
    for d := 0; d < 30; d++ {
        data = append(data, "id1")
        data = append(data, "id2")
        data = append(data, "id3")
    }

    chanData := createChan(data)    


    for i := 0; i < 10; i++ {
        wg.Add(1)
        process(chanData, i)
    }

    wg.Wait()
}

func createChan(data []string) <-chan string {
    var out = make(chan string)
    go func() {
        for _, val := range data {
            out <- val
        }
    close(out)
    }()
    return out
}

func process(ids <-chan string, i int) {
    go func() {
        defer wg.Done()
        for id := range ids {
            fmt.Println(id + " (goroutine " + strconv.Itoa(i) + ")")
            time.Sleep(1 * time.Second)
        }
    }()
}

--edit: 所有值都需要以任意顺序处理,但“id1,”id2”和”id3”需要阻塞,因此它们不能被多个goroutine同时处理。

4

3 回答 3

1

这里最简单的解决方案是根本不发送重复值,然后不需要同步。

func createChan(data []string) <-chan string {
    seen := make(map[string]bool)
    var out = make(chan string)
    go func() {
        for _, val := range data {
            if seen[val] {
                continue
            }
            seen[val] = true
            out <- val
        }
        close(out)
    }()
    return out
}
于 2018-06-21T17:38:06.080 回答
0

根据定义,您所说的问题很困难,我的第一选择是重新设计应用程序以避免它,但如果这不是一个选项:

首先,我假设如果一个给定的 ID 重复,你仍然希望它处理两次,但不是并行处理(如果不是这种情况并且必须忽略第二个实例,它变得更加困难,因为你必须记住你的每个ID已经永远处理了,所以你不要在它上面运行任务两次)。

为了实现您的目标,您必须跟踪在 goroutine 中被执行的每个 ID - gomap 是您最好的选择(请注意,它的大小将增长到与您并行旋转的 goroutine 一样多!)。映射本身必须由锁保护,因为它是从多个 goroutine 修改的。

我要进行的另一个简化是,如果发现当前由另一个 gorotuine 处理,则可以将从通道中删除的 ID 添加回通道中。然后,我们需要map[string]bool作为追踪器,加上一个sync.Mutex来守护它。为简单起见,我假设 map、mutex 和 channel 是全局变量;但这对您来说可能不方便 - 安排您认为合适的访问权限(goroutine 的参数、闭包等)。

import "sync"

var idmap map[string]bool
var mtx sync.Mutex
var queue chan string

func process_one_id(id string) {
busy := false
mtx.Lock()
if idmap[id] {
        busy = true
    } else {
        idmap[id] = true
    }
    mtx.Unlock()
    if busy { // put the ID back on the queue and exit
        queue <- id
        return
    }
    // ensure the 'busy' mark is cleared at the end:
    defer func() { mtx.Lock(); delete(idmap, id); mtx.Unlock() }()
    // do your processing here
    // ....

}
于 2018-06-25T11:26:37.850 回答
0

我找到了解决方案。有人写了一个包(github.com/EagleChen/mapmutex)来做我需要的:

package main

import (
    "fmt"
    "github.com/EagleChen/mapmutex"
    "strconv"
    "sync"
    "time"
)

var wg sync.WaitGroup
var mutex *mapmutex.Mutex

func main() {

    mutex = mapmutex.NewMapMutex()

    var data []string
    for d := 0; d < 30; d++ {
        data = append(data, "id1")
        data = append(data, "id2")
        data = append(data, "id3")
    }

    chanData := createChan(data)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        process(chanData, i)
    }

    wg.Wait()
}

func createChan(data []string) <-chan string {
    var out = make(chan string)
    go func() {
        for _, val := range data {
            out <- val
        }
        close(out)
    }()
    return out
}

func process(ids <-chan string, i int) {
    go func() {
        defer wg.Done()
        for id := range ids {
            if mutex.TryLock(id) {
                fmt.Println(id + " (goroutine " + strconv.Itoa(i) + ")")
                time.Sleep(1 * time.Second)
                mutex.Unlock(id)
            }
        }
    }()
}
于 2018-06-25T11:12:58.200 回答