我有一个简单的应用程序,我正在研究它来读取 MongoDB 的复制 oplog,将结果序列化为 Go 结构并将其发送到要处理的通道。目前我正在从该通道读取并简单地打印出结构内部的值。
我尝试使用 for/range 从通道中读取值,直接从通道中简单读取,然后将其放入具有超时的选择中。结果都是一样的。每次我运行代码时,我都会从通道中得到不同的结果。每次写入通道时,我都会看到一次,但是从该通道读取时,我有时会读出相同的值 1-3 有时甚至 4 次,即使只有一次写入也是如此。
这通常只发生在初始加载(拉入较旧的记录)时,并且在读取频道的实时添加时似乎不会发生。在第一次读取项目之前从通道中读取太快是否存在一些问题?
package main
import (
"fmt"
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
)
type Operation struct {
Id int64 `bson:"h" json:"id"`
Operator string `bson:"op" json:"operator"`
Namespace string `bson:"ns" json:"namespace"`
Select bson.M `bson:"o" json:"select"`
Update bson.M `bson:"o2" json:"update"`
Timestamp int64 `bson:"ts" json:"timestamp"`
}
func Tail(collection *mgo.Collection, Out chan<- *Operation) {
iter := collection.Find(nil).Tail(-1)
var oper *Operation
for {
for iter.Next(&oper) {
fmt.Println("\n<<", oper.Id)
Out <- oper
}
if err := iter.Close(); err != nil {
fmt.Println(err)
return
}
}
}
func main() {
session, err := mgo.Dial("127.0.0.1")
if err != nil {
panic(err)
}
defer session.Close()
c := session.DB("local").C("oplog.rs")
cOper := make(chan *Operation, 1)
go Tail(c, cOper)
for operation := range cOper {
fmt.Println()
fmt.Println("Id: ", operation.Id)
fmt.Println("Operator: ", operation.Operator)
fmt.Println("Namespace: ", operation.Namespace)
fmt.Println("Select: ", operation.Select)
fmt.Println("Update: ", operation.Update)
fmt.Println("Timestamp: ", operation.Timestamp)
}
}