0
func main() {
    jobs := []Job{job1, job2, job3}
    numOfJobs := len(jobs)
    resultsChan := make(chan *Result, numOfJobs)
    jobChan := make(chan *job, numOfJobs)
    go consume(numOfJobs, jobChan, resultsChan)
    for i := 0; i < numOfJobs; i++ {
        jobChan <- jobs[i]
    }
    close(jobChan)

    for i := 0; i < numOfJobs; i++ {
        <-resultsChan
    }
    close(resultsChan)
}

func (b *Blockchain) consume(num int, jobChan chan *Job, resultsChan chan *Result) {
    for i := 0; i < num; i++ {
        go func() {
            job := <-jobChan
            resultsChan <- doJob(job)
        }()
    }
}

在上面的示例中,作业被推送到 jobChan 中,goroutines 会将其从 jobChan 中拉出并同时执行作业并将结果推送到 resultsChan 中。然后我们将从 resultsChan 中提取结果。

问题一:

在我的代码中,没有序列化/线性化的结果。尽管作业按作业 1、作业 2、作业 3 的顺序排列。结果可能会显示为 job3、job1、job2,具体取决于哪一个耗时最长。

我仍然想同时执行这些作业,但是,我需要确保结果从 resultsChan 中出来的顺序与它作为作业进入的顺序相同。

问题2:

我有大约 30 万个工作,这意味着代码将生成多达 30 万个 goroutine。拥有这么多 goroutine 是否有效,或者我最好将这些工作组合成 100 个左右的切片,让每个 goroutine 经历 100 个而不是 1 个。

4

1 回答 1

1

这是我处理序列化的一种方式(并且还设置了有限数量的工人)。我设置了一些带有输入和输出字段以及同步通道的工作对象,然后我循环遍历它们,拾取他们所做的任何工作并给他们一个新工作。然后我最后一次通过它们,以拾取剩余的任何已完成的工作。请注意,您可能希望工作人员数量稍微超过您的核心数量,这样即使有一项异常长的工作,您也可以让所有资源保持忙碌状态。代码位于http://play.golang.org/p/PM9y4ieMxw及以下。

这是毛茸茸的(比我在坐下来写一个例子之前记得的毛茸茸!)——很想看看其他人有什么,要么只是更好的实现,要么是完全不同的方式来实现你的目标。

package main

import (
    "fmt"
    "math/rand"
    "runtime"
    "time"
)

type Worker struct {
    in     int
    out    int
    inited bool

    jobReady chan bool
    done     chan bool
}

func (w *Worker) work() {
    time.Sleep(time.Duration(rand.Float32() * float32(time.Second)))
    w.out = w.in + 1000
}
func (w *Worker) listen() {
    for <-w.jobReady {
        w.work()
        w.done <- true
    }
}
func doSerialJobs(in chan int, out chan int) {
    concurrency := 23
    workers := make([]Worker, concurrency)
    i := 0
    // feed in and get out items
    for workItem := range in {
        w := &workers[i%
            concurrency]
        if w.inited {
            <-w.done
            out <- w.out
        } else {
            w.jobReady = make(chan bool)
            w.done = make(chan bool)
            w.inited = true
            go w.listen()
        }
        w.in = workItem
        w.jobReady <- true
        i++
    }
    // get out any job results left over after we ran out of input
    for n := 0; n < concurrency; n++ {
        w := &workers[i%concurrency]
        if w.inited {
            <-w.done
            out <- w.out
        }
        close(w.jobReady)
        i++
    }
    close(out)
}
func main() {
    runtime.GOMAXPROCS(10)
    in, out := make(chan int), make(chan int)
    allFinished := make(chan bool)
    go doSerialJobs(in, out)
    go func() {
        for result := range out {
            fmt.Println(result)
        }
        allFinished <- true
    }()
    for i := 0; i < 100; i++ {
        in <- i
    }
    close(in)
    <-allFinished
}

请注意,在此示例中,只有inout传输实际数据——所有其他通道仅用于同步。

于 2014-01-07T19:33:35.510 回答