0

我正在从运行的主 mongodb 实例中以 json 格式存储 oplog

[{"Timestamp":6477723955623886852,"HistoryID":166676398345289971,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9e8489535b1848d32d","num":9795},"QueryObject":null},{"Timestamp":6477723955623886853,"HistoryID":334415645635509829,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9e8489535b1848d32e","num":6183},"QueryObject":null},{"Timestamp":6477723959918854145,"HistoryID":-8753067027179338671,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9f8489535b1848d32f","num":6439},"QueryObject":null},{"Timestamp":6477723959918854146,"HistoryID":4170487939316826112,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9f8489535b1848d330","num":5699},"QueryObject":null},{"Timestamp":6477723959918854147,"HistoryID":-869729083312840671,"MongoVersion":2,"Operation":"i","NameSpace":"test.tests","Object":{"__v":0,"_id":"59e57f9f8489535b1848d331","num":8779},"QueryObject":null}]

但是我无法找到任何方法将这些操作日志推送到另一个主要的 mongodb 实例中并重播它们,以便可以同步两个数据库

这是我为重放操作日志而编写的代码

package operations

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "reflect"
    "time"

    "gopkg.in/mgo.v2"
    "gopkg.in/mgo.v2/bson"
)

type OlogQuery struct {
    Session *mgo.Session
    Query   bson.M
    Timeout time.Duration
}

type Olog struct {
    ts bson.MongoTimestamp `bson:"ts"`
    h  float64             `bson:"h"`
    v  int                 `bson:"v"`
    op string              `bson:"op"`
    ns string              `bson:"ns"`
    o  bson.M              `bson:"o"`
    o2 bson.M              `bson:"o2"`
}

type Obj struct {
    data map[string]Olog
}

func exists(path string) (bool, error) {
    _, err := os.Stat(path)
    if err == nil {
    return true, nil
}
if os.IsNotExist(err) {
    return false, nil
}
return true, err
}

var Written = make(chan bool, 1)

func ReplayOpLogs(dest *mgo.Session) bool {

dirExists, _ := exists("oplogs")
if !dirExists {
     fmt.Println("There are no oplogs to replay")
     return false
    }
    files, err := ioutil.ReadDir("./oplogs")
     if err != nil {
     log.Fatal(err)
    }

for _, f := range files {
    raw, err := ioutil.ReadFile("./oplogs/" + f.Name())
    if err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }
    var ol []map[string]interface{}
    //var ol interface{}
    if err := json.Unmarshal([]byte(raw), &ol); err != nil {
        panic(err)
    }
    for x := 0; x < len(ol); x++ {
        v := reflect.ValueOf(ol[x])
        if v.Kind() == reflect.Map {

            for key := range ol[x] {
                var temp Olog

                switch key {
                case "Timestamp":
                    temp.ts, _ = ol[x][key].(bson.MongoTimestamp)
                case "HistoryID":
                    temp.h, _ = ol[x][key].(float64)
                case "MongoVersion":
                    temp.v, _ = ol[x][key].(int)
                case "Operation":
                    temp.op, _ = ol[x][key].(string)
                case "NameSpace":
                    temp.ns, _ = ol[x][key].(string)
                case "Object":
                    temp.o, _ = ol[x][key].(bson.M)
                case "QueryObject":
                    temp.o2, _ = ol[x][key].(bson.M)
                }

                var olq OlogQuery
                olq.Session = dest        //Destination Session
                olq.Query = bson.M{"ts": bson.M{"$gt": LastTime(dest)}}
                olq.Timeout = 1 * time.Second
                db := olq.Session.DB("local")
                collection := db.C("oplog.rs")
                b, err := bson.Marshal(temp)
                if err != nil {
                    panic(err)
                }
                collection.Insert(b)
                collection.Find(olq.Query).LogReplay()
            }

        }

    }

}

return true

 }
4

1 回答 1

0

使用mongo-connector来做这个怎么样?使用连接器,您可以将一个 opLog 复制到另一个 mongodb 主数据库。

于 2017-10-19T05:31:21.070 回答