我编写了一个简单的小示例,将 1000 万条记录插入 mongodb。我首先让它按顺序工作。然后我查找了如何进行并发,并找到了 goroutines。这似乎是我想要的,但它的行为并不像我预期的那样。我实现了一个 WaitGroup 来阻止程序在所有 goroutine 完成之前退出,但我仍然遇到问题。
所以我将从正在发生的事情开始,然后显示代码。当我在没有 goroutine 的情况下运行代码时,所有 1000 万条记录都可以插入到 mongodb 中。但是,当我添加 goroutine 时,会输入一些不确定的数量.. 通常大约 8500 给或取几百。我检查了 mongodb 日志以查看它是否有问题并且没有任何显示。所以我不确定是不是这样,可能是,只是没有被记录。无论如何,这是代码:
(旁注:我一次只做 1 条记录,但我已经把它分成了一个方法,所以我可以在将来一次测试多条记录。只是还没弄清楚如何用 mongodb然而。)
package main
import (
"fmt"
"labix.org/v2/mgo"
"strconv"
"time"
"sync"
)
// structs
type Reading struct {
Id string
Name string
}
var waitGroup sync.WaitGroup
// methods
func main() {
// Setup timer
startTime := time.Now()
// Setup collection
collection := getCollection("test", "readings")
fmt.Println("collection complete: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Setup readings
readings := prepareReadings()
fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Insert readings
for i := 1; i <= 1000000; i++ {
waitGroup.Add(1)
go insertReadings(collection, readings)
// fmt.Print(".")
if i % 1000 == 0 {
fmt.Println("1000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
}
waitGroup.Wait()
fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
func getCollection(databaseName string, tableName string) *mgo.Collection {
session, err := mgo.Dial("localhost")
if err != nil {
// panic(err)
fmt.Println("error getCollection:", err)
}
// defer session.Close()
// Optional. Switch the session to a monotonic behavior.
// session.SetMode(mgo.Monotonic, true)
collection := session.DB(databaseName).C(tableName)
return collection
}
func insertReadings(collection *mgo.Collection, readings []Reading) {
err := collection.Insert(readings)
if err != nil {
// panic(err)
fmt.Println("error insertReadings:", err)
}
waitGroup.Done()
}
func prepareReadings() []Reading {
var readings []Reading
for i := 1; i <= 1; i++ {
readings = append(readings, Reading{Name: "Thing"})
}
return readings
}