2

我试图同时从 S3 下载多个文件,并将它们的内容合并到一个字节缓冲区中。这些文件是 csv 格式的。我的代码似乎大部分时间都在工作(10 次尝试中的 8 次)。但在某些情况下,在我检查了合并缓冲区后,我得到的比我应该得到的要少(通常不超过 100 行丢失)。预期的记录总数为 4802。如果按顺序运行我的代码,则不会出现此问题。但我需要使用 goroutines 来提高速度。这是我尝试做的主要要求。我已经运行了 go data race 检查器没有出现数据竞争,并且我打印的错误语句永远不会打印出来。

这是我使用的代码:

    var pingsBuffer = aws.NewWriteAtBuffer([]byte{}) 
        //range over the contents of the index file
    for _, file := range indexList {
        wg.Add(1)
        go download(key + string(file), pingsBuffer, &wg)
    }
    wg.Wait()

和下载功能(也整合下载的文件)

func download(key string, buffer *aws.WriteAtBuffer, wg *sync.WaitGroup)  {

defer wg.Done()

awsBuffer := aws.NewWriteAtBuffer([]byte{})

input := &s3.GetObjectInput {
    Bucket: aws.String(defaultLocationRootBucket),
    Key:    aws.String(key),
}

n1, downloadError := downloader.Download(awsBuffer, input)
if downloadError != nil {
    loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to download from S3, file(%v) with error : %v.", key, downloadError))
    return
}


lenghts3:= int64(len(buffer.Bytes()))

n2, bufferError := buffer.WriteAt(awsBuffer.Bytes(), lenghts3 )
if bufferError != nil {
    loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to write to buffer, the file(%v) downloaded from S3  with error : %v.", key, bufferError))
}
4

1 回答 1

2

这段代码:

lenghts3:= int64(len(buffer.Bytes()))

是一个并发问题:两个例程可能同时获取长度,获取相同的起始位置,并且都以相同的起始位置继续写入缓冲区,互相踩到对方的脚趾。

由于您已经在内存中检索整个对象而不是流式传输到组合缓冲区,因此您也可以在通道上发送每个文件的完整内容,并让该通道上的接收器将每个结果附加到共享字节缓冲区他们同步进来。

于 2017-06-26T17:18:05.377 回答