3

I have a test program that gives different results when executing more than one goroutine on more than one Cpu (Goroutines = Cpus). The "test" is about syncing goroutines using channels, and the program itself counts occurences of chars in strings. It produces consistent results on one Cpu / one goroutine.

See code example on playground (Note: Run on local machine to execute on multi core, and watch the resulting numbers vary): http://play.golang.org/p/PT5jeCKgBv .

Code summary: The program counts occurences of 4 different chars (A,T, G,C) in (DNA) strings.

Problem: Result (n occurences of chars) varies when executed on multiple Cpu's (goroutines). Why?

Description:

  1. A goroutine spawns work (SpawnWork) as strings to Workers. Sets up artificial string input data (hardcoded strings are copied n times).
  2. Goroutine Workers (Worker) are created equalling the numbers of Cpu's.
  3. Workers checks each char in string and counts A,T's and sends the sum into a channel, and G,C counts to another channel.
  4. SpawnWork closes workstring channel as to control Workers (which consumes strings using range, which quits when the input channel is closed by SpawnWork).
  5. When Workers has consumed its ranges (of chars) it sends a quit signal on the quit channel (quit <- true). These "pulses" will occure Cpu number of times ( Cpu count = goroutines count).
  6. Main (select) loop will quit when it has received Cpu-count number of quit signals.
  7. Main func prints a summary of occurences of Chars (A,T's, G,C's).

Simplified code:

1. "Worker" (goroutines) counting chars in lines:

func Worker(inCh chan *[]byte, resA chan<- *int, resB chan<- *int, quit chan bool) {
    //for p_ch := range inCh {
    for {
        p_ch, ok := <-inCh // similar to range
        if ok {
            ch := *p_ch
            for i := 0; i < len(ch); i++ {
                if ch[i] == 'A' || ch[i] == 'T' {        // Count A:s and T:s
                    at++
                } else if ch[i] == 'G' || ch[i] == 'C' { // Count G:s and C:s
                    gc++
                }
            }
            resA <- &at  // Send line results on separate channels
            resB <- &gc  // Send line results on separate channels
        } else {
            quit <- true // Indicate that we're all done
            break
        }
    }
}

2. Spawn work (strings) to workers:

func SpawnWork(inStr chan<- *[]byte, quit chan bool) {
    // Artificial input data
    StringData :=
        "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
        "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n" +
        "... etc\n" +
    // ...
    for scanner.Scan() {
        s := scanner.Bytes()
        if len(s) == 0 || s[0] == '>' {
            continue
        } else {
            i++
            inStr <- &s
        }
    }
    close(inStr) // Indicate (to Workers) that there's no more strings coming.
}

3. Main routine:

func main() {
    // Count Cpus, and count down in final select clause
    CpuCnt := runtime.NumCPU() 
    runtime.GOMAXPROCS(CpuCnt)
    // Make channels
    resChA := make(chan *int)
    resChB := make(chan *int)
    quit := make(chan bool)
    inStr := make(chan *[]byte)

    // Set up Workers ( n = Cpu )
    for i := 0; i < CpuCnt; i++ {
        go Worker(inStr, resChA, resChB, quit)
    }
    // Send lines to Workers
    go SpawnWork(inStr, quit)

    // Count the number of "A","T" & "G","C" per line 
    // (comes in here as ints per row, on separate channels (at and gt))
    for {
        select {
        case tmp_at := <-resChA:
            tmp_gc := <-resChB // Ch A and B go in pairs anyway
            A += *tmp_at       // sum of A's and T's
            B += *tmp_gc       // sum of G's and C's
        case <-quit:
            // Each goroutine sends "quit" signals when it's done. Since 
            // the number of goroutines equals the Cpu counter, we count 
            // down each time a goroutine tells us it's done (quit at 0):
            CpuCnt--
            if CpuCnt == 0 { // When all goroutines are done then we're done.
                goto out     
            }
        }
    }
out:
    // Print report to screen
}

Why does this code count consistently only when executed on a singel cpu/goroutine? That is, the channels doesn't seem to sync, or the main loop quits forcefully before all goroutines are done? Scratching head.

(Again: See/run the full code at the playground: http://play.golang.org/p/PT5jeCKgBv )

// Rolf Lampa

4

1 回答 1

3

这是一个工作版本,无论使用多少 CPU,它都能始终产生相同的结果。

这是我所做的

  • 删除传递*int- 在通道中传递非常活泼!
  • 删除传递*[]byte- 毫无意义,因为切片无论如何都是引用类型
  • 在将切片放入通道之前复制切片 - 切片指向相同的内存导致竞争
  • 修复atgcin的初始化Worker- 他们在错误的地方 - 这是结果差异的主要原因
  • 使用sync.WaitGroup进行同步和通道 close()

我使用-racego build 的参数来查找和修复数据竞争。

package main

import (
    "bufio"
    "fmt"
    "runtime"
    "strings"
    "sync"
)

func Worker(inCh chan []byte, resA chan<- int, resB chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Worker started...")
    for ch := range inCh {
        at := 0
        gc := 0
        for i := 0; i < len(ch); i++ {
            if ch[i] == 'A' || ch[i] == 'T' {
                at++
            } else if ch[i] == 'G' || ch[i] == 'C' {
                gc++
            }
        }
        resA <- at
        resB <- gc
    }

}

func SpawnWork(inStr chan<- []byte) {
    fmt.Println("Spawning work:")
    // An artificial input source.
    StringData :=
        "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
            "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n" +
            "CTTCCCAATTGGATTAGACTATTAACATTTCAGAAAGGATGTAAGAAAGGACTAGAGAGA\n" +
            "TATACTTAATGTTTTTAGTTTTTTAAACTTTACAAACTTAATACTGTCATTCTGTTGTTC\n" +
            "AGTTAACATCCCTGAATCCTAAATTTCTTCAGATTCTAAAACAAAAAGTTCCAGATGATT\n" +
            "TTATATTACACTATTTACTTAATGGTACTTAAATCCTCATTNNNNNNNNCAGTACGGTTG\n" +
            "TTAAATANNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
            "NNNNNNNCTTCAGAAATAAGTATACTGCAATCTGATTCCGGGAAATATTTAGGTTCATAA\n"
    // Expand data n times
    tmp := StringData
    for n := 0; n < 1000; n++ {
        StringData = StringData + tmp
    }
    scanner := bufio.NewScanner(strings.NewReader(StringData))
    scanner.Split(bufio.ScanLines)

    var i int
    for scanner.Scan() {
        s := scanner.Bytes()
        if len(s) == 0 || s[0] == '>' {
            continue
        } else {
            i++
            s_copy := append([]byte(nil), s...)
            inStr <- s_copy
        }
    }
    close(inStr)
}

func main() {
    CpuCnt := runtime.NumCPU() // Count down in select clause
    CpuOut := CpuCnt           // Save for print report
    runtime.GOMAXPROCS(CpuCnt)
    fmt.Printf("Processors: %d\n", CpuCnt)

    resChA := make(chan int)
    resChB := make(chan int)
    inStr := make(chan []byte)

    fmt.Println("Spawning workers:")
    var wg sync.WaitGroup
    for i := 0; i < CpuCnt; i++ {
        wg.Add(1)
        go Worker(inStr, resChA, resChB, &wg)
    }
    fmt.Println("Spawning work:")
    go func() {
        SpawnWork(inStr)
        wg.Wait()
        close(resChA)
        close(resChB)
    }()

    A := 0
    B := 0
    LineCnt := 0
    for tmp_at := range resChA {
        tmp_gc := <-resChB // Theese go together anyway
        A += tmp_at
        B += tmp_gc
        LineCnt++
    }

    if !(A+B > 0) {
        fmt.Println("No A/B was found!")
    } else {
        ABFraction := float32(B) / float32(A+B)
        fmt.Println("\n----------------------------")
        fmt.Printf("Cpu's  : %d\n", CpuOut)
        fmt.Printf("Lines  : %d\n", LineCnt)
        fmt.Printf("A+B    : %d\n", A+B)
        fmt.Printf("A      : %d\n", A)
        fmt.Printf("B      : %d\n", A)
        fmt.Printf("AB frac: %v\n", ABFraction*100)
        fmt.Println("----------------------------")
    }
}
于 2013-06-14T07:41:39.733 回答