3

我有一个简单的应用程序,我正在研究它来读取 MongoDB 的复制 oplog,将结果序列化为 Go 结构并将其发送到要处理的通道。目前我正在从该通道读取并简单地打印出结构内部的值。

我尝试使用 for/range 从通道中读取值,直接从通道中简单读取,然后将其放入具有超时的选择中。结果都是一样的。每次我运行代码时,我都会从通道中得到不同的结果。每次写入通道时,我都会看到一次,但是从该通道读取时,我有时会读出相同的值 1-3 有时甚至 4 次,即使只有一次写入也是如此。

这通常只发生在初始加载(拉入较旧的记录)时,并且在读取频道的实时添加时似乎不会发生。在第一次读取项目之前从通道中读取太快是否存在一些问题?

package main

import (
    "fmt"
    "labix.org/v2/mgo"
    "labix.org/v2/mgo/bson"
)

type Operation struct {
    Id        int64  `bson:"h" json:"id"`
    Operator  string `bson:"op" json:"operator"`
    Namespace string `bson:"ns" json:"namespace"`
    Select    bson.M `bson:"o" json:"select"`
    Update    bson.M `bson:"o2" json:"update"`
    Timestamp int64  `bson:"ts" json:"timestamp"`
}

func Tail(collection *mgo.Collection, Out chan<- *Operation) {
    iter := collection.Find(nil).Tail(-1)
    var oper *Operation

    for {
        for iter.Next(&oper) {
            fmt.Println("\n<<", oper.Id)
            Out <- oper
        }

        if err := iter.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }
}

func main() {
    session, err := mgo.Dial("127.0.0.1")

    if err != nil {
        panic(err)
    }
    defer session.Close()

    c := session.DB("local").C("oplog.rs")

    cOper := make(chan *Operation, 1)

    go Tail(c, cOper)

    for operation := range cOper {
        fmt.Println()
        fmt.Println("Id: ", operation.Id)
        fmt.Println("Operator: ", operation.Operator)
        fmt.Println("Namespace: ", operation.Namespace)
        fmt.Println("Select: ", operation.Select)
        fmt.Println("Update: ", operation.Update)
        fmt.Println("Timestamp: ", operation.Timestamp)
    }
}
4

2 回答 2

5

我认为您正在重用*Operation导致问题的您。例如:

http://play.golang.org/p/_MeSBLWPwN

c := make(chan *int, 1)

go func() {
    val := new(int)
    for i :=0; i<10; i++ {
        *val = i
        c <- val
    }
    close(c)
}()


for val := range c {
    time.Sleep(time.Millisecond * 1)
    fmt.Println(*val)
}

此代码导致:

2
3
4
5
6
7
8
9
9
9

更重要的是它不是线程安全的。尝试这样做:

for {
    for { 
        var oper *Operation
        if !iter.Next(&oper) {
            break
        }
        fmt.Println("\n<<", oper.Id)
        Out <- oper
    }
    ...
}

或者使用普通Operation的而不是*Operation. (因为没有指针,值被复制)

于 2014-01-31T21:27:31.830 回答
3

我认为您正在做的是每次反序列化到结构的同一个实例中,因此具有由通道读取并由发送者重写的相同对象。尝试简单地将它的初始化移动到循环中,这样您每次都会创建一个新的。

您也可以使用go run -raceor运行此代码go build -race,它会警告此类内容。

于 2014-01-31T21:28:19.567 回答