38

我想使用 Go 从 Yahoo Finance 下载股票价格电子表格。我将在其自己的 goroutine 中为每只股票发出一个 http 请求。我有一个大约 2500 个符号的列表,但我宁愿一次发出 250 个请求,而不是并行发出 2500 个请求。在 Java 中,我会创建一个线程池并在线程空闲时重用它们。我试图找到类似的东西,一个 goroutine 池,如果你愿意的话,但找不到任何资源。如果有人能告诉我如何完成手头的任务或为我指出相同的资源,我将不胜感激。谢谢!

4

4 回答 4

63

我想,最简单的方法是创建 250 个 goroutine 并将它们传递给一个通道,您可以使用该通道将链接从主 goroutine 传递到子 goroutine,并监听该通道。

当所有链接都传递给 goroutines 时,您关闭一个通道,所有 goroutines 都会完成它们的工作。

为了在子进程处理数据之前确保自己免受主 goroutine 的影响,您可以使用sync.WaitGroup.

这是我上面所说的一些代码来说明(不是最终的工作版本,但说明了重点):

func worker(linkChan chan string, wg *sync.WaitGroup) {
   // Decreasing internal counter for wait-group as soon as goroutine finishes
   defer wg.Done()

   for url := range linkChan {
     // Analyze value and do the job here
   }
}

func main() {
    lCh := make(chan string)
    wg := new(sync.WaitGroup)

    // Adding routines to workgroup and running then
    for i := 0; i < 250; i++ {
        wg.Add(1)
        go worker(lCh, wg)
    }

    // Processing all links by spreading them to `free` goroutines
    for _, link := range yourLinksSlice {
        lCh <- link
    }

    // Closing channel (waiting in goroutines won't continue any more)
    close(lCh)

    // Waiting for all goroutines to finish (otherwise they die as main routine dies)
    wg.Wait()
}
于 2013-08-16T07:19:17.760 回答
3

Go您可以使用此git repo中的线程池实现库

是关于如何将通道用作线程池的不错的博客

来自博客的片段

    var (
 MaxWorker = os.Getenv("MAX_WORKERS")
 MaxQueue  = os.Getenv("MAX_QUEUE")
)

//Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
} 
于 2017-10-04T13:21:00.137 回答
2

此示例使用两个通道,一个用于输入,另一个用于输出。Worker 可以缩放到任何大小,每个 goroutine 在输入队列上工作并将所有输出保存到输出通道。非常欢迎对更简单的方法提供反馈。

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func worker(input chan string, output chan string) {
    defer wg.Done()
    // Consumer: Process items from the input channel and send results to output channel
    for value := range input {
        output <- value + " processed"
    }
}

func main() {
    var jobs = []string{"one", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two"}
    input := make(chan string, len(jobs))
    output := make(chan string, len(jobs))
    workers := 250

    // Increment waitgroup counter and create go routines
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go worker(input, output)
    }

    // Producer: load up input channel with jobs
    for _, job := range jobs {
        input <- job
    }

    // Close input channel since no more jobs are being sent to input channel
    close(input)
    // Wait for all goroutines to finish processing
    wg.Wait()
    // Close output channel since all workers have finished processing
    close(output)

    // Read from output channel
    for result := range output {
        fmt.Println(result)
    }

}
于 2018-04-12T14:27:59.767 回答
1

你可以看看这个

我们在 go 中创建了一个线程池,并将其用于我们的生产系统。

我从这里参考了

它使用起来非常简单,并且还有一个 prometheus 客户端,可以告诉您使用了多少个工人。

初始化只需创建一个调度程序的实例

dispatcher = workerpool.NewDispatcher(
    "DispatcherName",
    workerpool.SetMaxWorkers(10),
)

创建一个实现此接口的对象(比如说job )。所以它应该实现 Process 方法

// IJob : Interface for the Job to be processed
type IJob interface {
    Process() error
}

然后只需将作业发送给调度员

dispatcher.JobQueue <- job //object of job

就是这个。

于 2020-06-04T06:03:17.443 回答