2

我想使用从 for 循环调用的 goroutine 加载一些 json 文件(“.json”)。我想让加载并行化(在加载其他文件的同时处理第一个文件)。

Q1。由于文件的数量可能会有所不同(要添加的新文件),我将使用带有文件名的(文件)列表(仅在此示例中自动生成名称),因此我想使用 for 循环。最佳?

Q2。什么是最有效的渠道使用。

第三季度。如果每个加载操作都需要一个唯一的通道(如下面的示例代码),我将如何定义通道?

示例代码(要压缩并能够使用文件名列表加载文件):


func load_json(aChan chan byte, s string) {
    // load "filename" + s + ".json"
    // confirm to the channel
    aChan <- 0
}

func do_stuff() {
    // .. with the newly loaded json
}

func Main() {
    chan_A := make(chan byte)
    go load_json(chan_A, "_classA")

    chan_B := make(chan byte)
    go load_json(chan_B, "_classB")

    chan_C := make(chan byte)
    go load_json(chan_C, "_classC")

    chan_D := make(chan byte)
    go load_json(chan_D, "_classD")


    <-chan_A
        // Now, do stuff with Class A
    <-chan_B
        // etc...
    <-chan_C
    <-chan_D
    fmt.Println("Done.")
}

编辑: 我根据“Tom”建议的想法设计了一个简化的测试解决方案(见下文)。就我而言,我将任务分为三个阶段,每个阶段使用一个通道来控制执行。但是,我倾向于使用此代码陷入死锁(请参阅执行结果和代码下方的注释)。

在PlayGround上运行此代码。

如何避免此代码中的死锁?:

type TJsonFileInfo struct {
    FileName string
}
type TChannelTracer struct {  // Will count & display visited phases A, B, C
    A, B, C int
}
var ChannelTracer TChannelTracer

var jsonFileList = []string{
    "./files/classA.json",
    "./files/classB.json",
    "./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    var newFileInfo TJsonFileInfo
    newFileInfo.FileName = aFileName
    // file, e := ioutil.ReadFile(newFileInfo.FileName)...
    ChannelTracer.A += 1
    fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
    aResultQueueChan <- &newFileInfo
}

func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.B += 1
    fmt.Printf("B. Marshalled file: %s\n", FileInfo.FileName)
    aResultQueueChan <- FileInfo
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.C += 1
    fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
    aDoneQueueChan <- FileInfo
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
        go UnmarshalFile(marshalChan, processChan)
        go ProcessWork(processChan, doneProcessingChan)
    }

    for {
        select {
        case result := <-marshalChan:
            result.FileName = result.FileName // dummy use
        case result := <-processChan:
            result.FileName = result.FileName // dummy use
        case result := <-doneProcessingChan:
            result.FileName = result.FileName // dummy use
            fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
        }
    }
}

/**
RESULTS (for phases A, B and C):

A. Loaded file: ./files/classA.json
A. Loaded file: ./files/classB.json
A. Loaded file: ./files/classC.json
B. Marshalled file: ./files/classB.json
B. Marshalled file: ./files/classC.json
C. Processed file: ./files/classB.json 
C. Processed file: ./files/classC.json 
Done. Channels visited: {3 2 2}     // ChannelTracer for phase A, B and C
Done. Channels visited: {3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/

请注意,此代码不会访问文件系统,因此它应该在 PlayGround 上运行

EDIT2 : - 除了不安全的“ChannelTracer”之外,我只能通过使用与文件任务相同次数的 doneProcessingChannel 来避免死锁。
在此处运行代码:Playground

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }

    //  Read doneProcessingChan equal number of times
    //  as the spawned tasks (files) above :
    for i := 0; i < len(jsonFileList); i++ {
        <-doneProcessingChan
        fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
    }
}

// RIL

4

2 回答 2

2

基于@BraveNewCurrency答案,我为您编写了一个简单的示例程序

package main

import (
    "encoding/json"
    "fmt"
    "os"
)

type Result struct {
    Some    string
    Another string
    AndAn   int
}

func generateWork(work chan *os.File) {
    files := []string{
        "/home/foo/a.json",
        "/home/foo/b.json",
        "/home/foo/c.json",
    }
    for _, path := range files {
        file, e := os.Open(path)
        if e != nil {
            panic(e)
        }
        work <- file
    }
}

func processWork(work chan *os.File, done chan Result) {
    file := <-work
    decoder := json.NewDecoder(file)
    result := Result{}
    decoder.Decode(&result)
    done <- result
}

func main() {
    work := make(chan *os.File)
    go generateWork(work)
    done := make(chan Result)
    for i := 0; i < 100; i++ {
        go processWork(work, done)
    }
    for {
        select {
        case result := <-done:
            // a result is available
            fmt.Println(result)
        }
    }
}

请注意,该程序无法在操场上运行,因为那里不允许文件系统访问。

编辑:

为了回答您问题中的版本,我采用了代码并更改了一些小东西

package main

import (
    _ "encoding/json"
    "fmt"
    _ "io/ioutil"
    _ "os"
)

type TJsonMetaInfo struct {
    MetaSystem string
}

type TJsonFileInfo struct {
    FileName string
}

type TChannelTracer struct { // Will count & display visited phases A, B, C
    A, B, C int
}

var ChannelTracer TChannelTracer

var jsonFileList = []string{
    "./files/classA.json",
    "./files/classB.json",
    "./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    newFileInfo := TJsonFileInfo{aFileName}
    // file, e := ioutil.ReadFile(newFileInfo.FileName)
    // etc...
    ChannelTracer.A += 1
    fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
    aResultQueueChan <- &newFileInfo
}

func UnmarshalFiles(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    for {
        FileInfo := <-aWorkQueueChan
        ChannelTracer.B += 1
        fmt.Printf("B. Unmarshalled file: %s\n", FileInfo.FileName)
        aResultQueueChan <- FileInfo
    }
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    for {
        FileInfo := <-aWorkQueueChan
        ChannelTracer.C += 1
        fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
        aDoneQueueChan <- FileInfo

    }
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }

    for {
        select {
        case result := <-doneProcessingChan:
            result.FileName = result.FileName // dummy use
            fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
        }
    }
}

请注意,此代码仍然死锁,但最后,当所有工作完成时,在最后一个空for循环中main()

另请注意,这些行:

ChannelTracer.A += 1
ChannelTracer.B += 1
ChannelTracer.C += 1

不是并发安全的。这意味着在多线程环境中,一个 goroutine 和另一个 goroutine 可能会尝试同时增加同一个计数器,从而导致错误计数。要解决此问题,请查看以下软件包:

于 2013-06-01T17:31:09.317 回答
0

您应该以这种方式构建您的程序:

1)主程序为“工作要做”创建一个通道,并且可能为“完成的工作”创建一个通道(两个通道都应该有一些缓冲)

2)分拆一个goroutine来生成文件列表并将它们放在“工作要做”频道中。

3) 启动 N 个 goroutine(在一个 for 循环中)来处理文件。该例程将从“待办事项”通道读取文件,对其进行处理,并将响应发送到“完成工作”通道。

4)主程序等待“完成工作”并打印它们或其他任何东西。

上面的最佳“N”因问题而异 - 如果您的工作受 CPU 限制,则最佳 N 应该与系统中的处理器数量有关。- 如果你的工作是磁盘绑定的,性能实际上可能会随着 N 的增加而下降,因为多个工作人员会导致更多的随机 I/O。- 如果您的工作从许多远程计算机中提取文件(想想网络爬虫),那么最佳 N 可能会非常高(100 甚至 1000)。

于 2013-06-01T13:29:39.500 回答