1

对于我们正在使用 go 的作业,我们要做的一件事是逐行解析 uniprotdatabasefile 以收集 uniprot 记录。

我不想共享太多代码,但我有一个工作代码片段可以在 48 秒内正确解析这样的文件(2.5 GB)(使用时间 go-package 测量)。它迭代地解析文件并向记录添加行,直到达到记录结束信号(完整记录),并创建记录上的元数据。然后将记录字符串清空,并逐行收集新记录。然后我想我会尝试使用 go-routines。

我之前从 stackoverflow 得到了一些提示,然后在原始代码中我简单地添加了一个函数来处理有关元数据创建的所有内容。

所以,代码正在做

  1. 创建一个空记录,
  2. 迭代文件并向记录添加行,
  3. 如果找到记录停止信号(现在我们有完整记录) - 将其交给 go 例程以创建元数据
  4. 将记录字符串设为空并从 2) 继续。

我还添加了一个sync.WaitGroup()以确保我(最终)等待每个例程完成。我认为这实际上会减少解析数据库文件所花费的时间,因为它会继续解析,而 goroutine 会作用于每条记录。但是,代码似乎运行了 20 多分钟,这表明出现了问题或开销变得疯狂。有什么建议么?

package main

import (
    "bufio"
    "crypto/sha1"
    "fmt"
    "io"
    "log"
    "os"
    "strings"
    "sync"
    "time"
)

type producer struct {
    parser uniprot
}

type unit struct {
    tag string
}

type uniprot struct {
    filenames     []string
    recordUnits   chan unit
    recordStrings map[string]string
}

func main() {
    p := producer{parser: uniprot{}}
    p.parser.recordUnits = make(chan unit, 1000000)
    p.parser.recordStrings = make(map[string]string)
    p.parser.collectRecords(os.Args[1])
}

func (u *uniprot) collectRecords(name string) {
    fmt.Println("file to open ", name)
    t0 := time.Now()
    wg := new(sync.WaitGroup)
    record := []string{}
    file, err := os.Open(name)
    errorCheck(err)
    scanner := bufio.NewScanner(file)
    for scanner.Scan() { //Scan the file
        retText := scanner.Text()
        if strings.HasPrefix(retText, "//") {
            wg.Add(1)
            go u.handleRecord(record, wg)
            record = []string{}
        } else {
            record = append(record, retText)
        }
    }
    file.Close()
    wg.Wait()
    t1 := time.Now()
    fmt.Println(t1.Sub(t0))
}

func (u *uniprot) handleRecord(record []string, wg *sync.WaitGroup) {
    defer wg.Done()
    recString := strings.Join(record, "\n")
    t := hashfunc(recString)
    u.recordUnits <- unit{tag: t}
    u.recordStrings[t] = recString
}

func hashfunc(record string) (hashtag string) {
    hash := sha1.New()
    io.WriteString(hash, record)
    hashtag = string(hash.Sum(nil))
    return
}

func errorCheck(err error) {
    if err != nil {
        log.Fatal(err)
    }
}
4

1 回答 1

3

首先:您的代码不是线程安全的。主要是因为您正在同时访问哈希图。这些对于 go 中的并发是不安全的,需要被锁定。代码中的错误行:

u.recordStrings[t] = recString

因为当你运行GOMAXPROCS> 1 时这会爆炸,我假设你没有这样做。确保您正在运行您的应用程序GOMAXPROCS=2以实现并行性或更高。默认值为 1,因此您的代码在单个操作系统线程上运行,当然,不能同时在两个 CPU 或 CPU 内核上调度。例子:

$ GOMAXPROCS=2 go run udb.go uniprot_sprot_viruses.dat

最后:从通道中提取值,否则您的程序将不会终止。如果 goroutine 的数量超过了您的限制,您正在创建一个死锁。我测试了一个 76MiB 的数据文件,你说你的文件大约是 2.5GB。我有 16347 个条目。假设线性增长,您的文件将超过 1e6,因此通道中没有足够的插槽,您的程序将死锁,在累积最后不会运行失败的 goroutine 时没有结果(悲惨地)。

因此,解决方案应该是添加一个 go 例程,该例程从通道中提取值并对其进行处理。

附带说明:如果您担心性能,请不要使用字符串,因为它们总是被复制。改为使用[]byte

于 2013-10-09T23:49:15.143 回答